Re: Re: Re: flink sql回撤流sink优化问题

2022-01-06 文章 Benchao Li
mini-batch对aggregate算子是有效的,开启了之后它的输出会降低一些,从而降低了sink的输出压力。

casel.chen  于2022年1月7日周五 07:42写道:

> mini-batch优化针对sink算子也有效吗?我是直接aggregate without window然后将聚合结果输出到sink算子。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-01-06 20:43:00,"Benchao Li"  写道:
> >这个问题可以用mini-batch[1]来解决呀
> >
> >[1]
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-aggregation
> >
> >casel.chen  于2021年12月26日周日 18:01写道:
> >
> >> 你说的是upsert-kafka的这两个参数吗?
> >>
> >> sink.buffer-flush.max-rows
> >> sink.buffer-flush.interval
> >> 确实能达到我想要的效果,但是会引入额外的kafka sink,另外还是从sink
> >> kafka消费再写入mysql,链路有点长,最好是能在原来作业的基础上在sink前添加一个聚合算子。
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2021-12-25 22:54:19,"郭伟权"  写道:
> >>
> >>
> >结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量
> >> >
> >> >casel.chen  于2021年12月23日周四 08:15写道:
> >> >
> >> >> flink sql中aggregate without
> >> >>
> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql
> >> >> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游?
> >> >> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗?
> >> >>
> >> >>
> >> >> 例如有下面binlog cdc购买数据(订单购买金额会更新):
> >> >>
> >> >> orderid.   categorydt
> amt
> >> >>
> >> >> 订单id 商品类型   购买时间(MMddHH)  购买金额
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时
> >> >>
> >> >>
> >> >>
> >> >> INSERT INTO mysql_sink_table
> >> >>
> >> >> SELECT category, dt, LAST_VALUE(total)
> >> >>
> >> >> OVER (
> >> >>
> >> >>   PARTITION BY category
> >> >>
> >> >>   ORDER BY PROCTIME()
> >> >>
> >> >>   RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
> >> >>
> >> >> ) AS var1
> >> >>
> >> >> FROM (
> >> >>
> >> >>   SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category,
> dt
> >> >>
> >> >> );
> >>
> >
> >
> >--
> >
> >Best,
> >Benchao Li
>


-- 

Best,
Benchao Li


Re:Re: Re: flink sql回撤流sink优化问题

2022-01-06 文章 casel.chen
mini-batch优化针对sink算子也有效吗?我是直接aggregate without window然后将聚合结果输出到sink算子。

















在 2022-01-06 20:43:00,"Benchao Li"  写道:
>这个问题可以用mini-batch[1]来解决呀
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-aggregation
>
>casel.chen  于2021年12月26日周日 18:01写道:
>
>> 你说的是upsert-kafka的这两个参数吗?
>>
>> sink.buffer-flush.max-rows
>> sink.buffer-flush.interval
>> 确实能达到我想要的效果,但是会引入额外的kafka sink,另外还是从sink
>> kafka消费再写入mysql,链路有点长,最好是能在原来作业的基础上在sink前添加一个聚合算子。
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2021-12-25 22:54:19,"郭伟权"  写道:
>>
>> >结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量
>> >
>> >casel.chen  于2021年12月23日周四 08:15写道:
>> >
>> >> flink sql中aggregate without
>> >> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql
>> >> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游?
>> >> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗?
>> >>
>> >>
>> >> 例如有下面binlog cdc购买数据(订单购买金额会更新):
>> >>
>> >> orderid.   categorydt  amt
>> >>
>> >> 订单id 商品类型   购买时间(MMddHH)  购买金额
>> >>
>> >>
>> >>
>> >>
>> >> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时
>> >>
>> >>
>> >>
>> >> INSERT INTO mysql_sink_table
>> >>
>> >> SELECT category, dt, LAST_VALUE(total)
>> >>
>> >> OVER (
>> >>
>> >>   PARTITION BY category
>> >>
>> >>   ORDER BY PROCTIME()
>> >>
>> >>   RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
>> >>
>> >> ) AS var1
>> >>
>> >> FROM (
>> >>
>> >>   SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt
>> >>
>> >> );
>>
>
>
>-- 
>
>Best,
>Benchao Li


Re: Re: flink sql回撤流sink优化问题

2022-01-06 文章 Benchao Li
这个问题可以用mini-batch[1]来解决呀

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-aggregation

casel.chen  于2021年12月26日周日 18:01写道:

> 你说的是upsert-kafka的这两个参数吗?
>
> sink.buffer-flush.max-rows
> sink.buffer-flush.interval
> 确实能达到我想要的效果,但是会引入额外的kafka sink,另外还是从sink
> kafka消费再写入mysql,链路有点长,最好是能在原来作业的基础上在sink前添加一个聚合算子。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-12-25 22:54:19,"郭伟权"  写道:
>
> >结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量
> >
> >casel.chen  于2021年12月23日周四 08:15写道:
> >
> >> flink sql中aggregate without
> >> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql
> >> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游?
> >> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗?
> >>
> >>
> >> 例如有下面binlog cdc购买数据(订单购买金额会更新):
> >>
> >> orderid.   categorydt  amt
> >>
> >> 订单id 商品类型   购买时间(MMddHH)  购买金额
> >>
> >>
> >>
> >>
> >> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时
> >>
> >>
> >>
> >> INSERT INTO mysql_sink_table
> >>
> >> SELECT category, dt, LAST_VALUE(total)
> >>
> >> OVER (
> >>
> >>   PARTITION BY category
> >>
> >>   ORDER BY PROCTIME()
> >>
> >>   RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
> >>
> >> ) AS var1
> >>
> >> FROM (
> >>
> >>   SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt
> >>
> >> );
>


-- 

Best,
Benchao Li


Re:Re: flink sql回撤流sink优化问题

2021-12-26 文章 casel.chen
你说的是upsert-kafka的这两个参数吗?

sink.buffer-flush.max-rows
sink.buffer-flush.interval
确实能达到我想要的效果,但是会引入额外的kafka sink,另外还是从sink 
kafka消费再写入mysql,链路有点长,最好是能在原来作业的基础上在sink前添加一个聚合算子。

















在 2021-12-25 22:54:19,"郭伟权"  写道:
>结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量
>
>casel.chen  于2021年12月23日周四 08:15写道:
>
>> flink sql中aggregate without
>> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql
>> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游?
>> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗?
>>
>>
>> 例如有下面binlog cdc购买数据(订单购买金额会更新):
>>
>> orderid.   categorydt  amt
>>
>> 订单id 商品类型   购买时间(MMddHH)  购买金额
>>
>>
>>
>>
>> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时
>>
>>
>>
>> INSERT INTO mysql_sink_table
>>
>> SELECT category, dt, LAST_VALUE(total)
>>
>> OVER (
>>
>>   PARTITION BY category
>>
>>   ORDER BY PROCTIME()
>>
>>   RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
>>
>> ) AS var1
>>
>> FROM (
>>
>>   SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt
>>
>> );


Re:Re: flink sql回撤流sink优化问题

2021-12-26 文章 casel.chen
jdbc sink的buffer-flush不会减少写入的数据量,只是变成微批写入而已,mysql写入的压力并没有减少。
而我想要实现的效果是会减少写的数据量,因为同一个key的数据被聚合成最后一条。

















在 2021-12-26 09:43:47,"Zhiwen Sun"  写道:
>不用那么复杂,正常的 insert select group by 即可, 一分钟写一次 mysql 就行。
>
>参考 JDBC sink [1] 中的 sink.buffer-flush.interval 和 sink.buffer-flush.max-rows
>参数
>
>[1] :
>https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/jdbc/
>
>
>Zhiwen Sun
>
>
>
>On Thu, Dec 23, 2021 at 8:15 AM casel.chen  wrote:
>
>> flink sql中aggregate without
>> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql
>> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游?
>> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗?
>>
>>
>> 例如有下面binlog cdc购买数据(订单购买金额会更新):
>>
>> orderid.   categorydt  amt
>>
>> 订单id 商品类型   购买时间(MMddHH)  购买金额
>>
>>
>>
>>
>> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时
>>
>>
>>
>> INSERT INTO mysql_sink_table
>>
>> SELECT category, dt, LAST_VALUE(total)
>>
>> OVER (
>>
>>   PARTITION BY category
>>
>>   ORDER BY PROCTIME()
>>
>>   RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
>>
>> ) AS var1
>>
>> FROM (
>>
>>   SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt
>>
>> );


Re: flink sql回撤流sink优化问题

2021-12-25 文章 Zhiwen Sun
不用那么复杂,正常的 insert select group by 即可, 一分钟写一次 mysql 就行。

参考 JDBC sink [1] 中的 sink.buffer-flush.interval 和 sink.buffer-flush.max-rows
参数

[1] :
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/jdbc/


Zhiwen Sun



On Thu, Dec 23, 2021 at 8:15 AM casel.chen  wrote:

> flink sql中aggregate without
> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql
> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游?
> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗?
>
>
> 例如有下面binlog cdc购买数据(订单购买金额会更新):
>
> orderid.   categorydt  amt
>
> 订单id 商品类型   购买时间(MMddHH)  购买金额
>
>
>
>
> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时
>
>
>
> INSERT INTO mysql_sink_table
>
> SELECT category, dt, LAST_VALUE(total)
>
> OVER (
>
>   PARTITION BY category
>
>   ORDER BY PROCTIME()
>
>   RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
>
> ) AS var1
>
> FROM (
>
>   SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt
>
> );


Re: flink sql回撤流sink优化问题

2021-12-25 文章 郭伟权
结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量

casel.chen  于2021年12月23日周四 08:15写道:

> flink sql中aggregate without
> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql
> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游?
> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗?
>
>
> 例如有下面binlog cdc购买数据(订单购买金额会更新):
>
> orderid.   categorydt  amt
>
> 订单id 商品类型   购买时间(MMddHH)  购买金额
>
>
>
>
> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时
>
>
>
> INSERT INTO mysql_sink_table
>
> SELECT category, dt, LAST_VALUE(total)
>
> OVER (
>
>   PARTITION BY category
>
>   ORDER BY PROCTIME()
>
>   RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
>
> ) AS var1
>
> FROM (
>
>   SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt
>
> );


Re: flink sql回撤流sink优化问题

2021-12-25 文章 郭伟权
结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量

casel.chen  于2021年12月23日周四 08:15写道:

> flink sql中aggregate without
> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql
> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游?
> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗?
>
>
> 例如有下面binlog cdc购买数据(订单购买金额会更新):
>
> orderid.   categorydt  amt
>
> 订单id 商品类型   购买时间(MMddHH)  购买金额
>
>
>
>
> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时
>
>
>
> INSERT INTO mysql_sink_table
>
> SELECT category, dt, LAST_VALUE(total)
>
> OVER (
>
>   PARTITION BY category
>
>   ORDER BY PROCTIME()
>
>   RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
>
> ) AS var1
>
> FROM (
>
>   SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt
>
> );


flink sql回撤流sink优化问题

2021-12-22 文章 casel.chen
flink sql中aggregate without 
window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql
请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游?
可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗?


例如有下面binlog cdc购买数据(订单购买金额会更新):

orderid.   categorydt  amt

订单id 商品类型   购买时间(MMddHH)  购买金额




按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时



INSERT INTO mysql_sink_table 

SELECT category, dt, LAST_VALUE(total)

OVER (

  PARTITION BY category

  ORDER BY PROCTIME() 

  RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW

) AS var1

FROM (

  SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt

);