Re: Re: Re: flink sql回撤流sink优化问题
mini-batch对aggregate算子是有效的,开启了之后它的输出会降低一些,从而降低了sink的输出压力。 casel.chen 于2022年1月7日周五 07:42写道: > mini-batch优化针对sink算子也有效吗?我是直接aggregate without window然后将聚合结果输出到sink算子。 > > > > > > > > > > > > > > > > > > 在 2022-01-06 20:43:00,"Benchao Li" 写道: > >这个问题可以用mini-batch[1]来解决呀 > > > >[1] > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-aggregation > > > >casel.chen 于2021年12月26日周日 18:01写道: > > > >> 你说的是upsert-kafka的这两个参数吗? > >> > >> sink.buffer-flush.max-rows > >> sink.buffer-flush.interval > >> 确实能达到我想要的效果,但是会引入额外的kafka sink,另外还是从sink > >> kafka消费再写入mysql,链路有点长,最好是能在原来作业的基础上在sink前添加一个聚合算子。 > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2021-12-25 22:54:19,"郭伟权" 写道: > >> > >> > >结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量 > >> > > >> >casel.chen 于2021年12月23日周四 08:15写道: > >> > > >> >> flink sql中aggregate without > >> >> > window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql > >> >> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游? > >> >> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗? > >> >> > >> >> > >> >> 例如有下面binlog cdc购买数据(订单购买金额会更新): > >> >> > >> >> orderid. categorydt > amt > >> >> > >> >> 订单id 商品类型 购买时间(MMddHH) 购买金额 > >> >> > >> >> > >> >> > >> >> > >> >> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时 > >> >> > >> >> > >> >> > >> >> INSERT INTO mysql_sink_table > >> >> > >> >> SELECT category, dt, LAST_VALUE(total) > >> >> > >> >> OVER ( > >> >> > >> >> PARTITION BY category > >> >> > >> >> ORDER BY PROCTIME() > >> >> > >> >> RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW > >> >> > >> >> ) AS var1 > >> >> > >> >> FROM ( > >> >> > >> >> SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, > dt > >> >> > >> >> ); > >> > > > > > >-- > > > >Best, > >Benchao Li > -- Best, Benchao Li
Re:Re: Re: flink sql回撤流sink优化问题
mini-batch优化针对sink算子也有效吗?我是直接aggregate without window然后将聚合结果输出到sink算子。 在 2022-01-06 20:43:00,"Benchao Li" 写道: >这个问题可以用mini-batch[1]来解决呀 > >[1] >https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-aggregation > >casel.chen 于2021年12月26日周日 18:01写道: > >> 你说的是upsert-kafka的这两个参数吗? >> >> sink.buffer-flush.max-rows >> sink.buffer-flush.interval >> 确实能达到我想要的效果,但是会引入额外的kafka sink,另外还是从sink >> kafka消费再写入mysql,链路有点长,最好是能在原来作业的基础上在sink前添加一个聚合算子。 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2021-12-25 22:54:19,"郭伟权" 写道: >> >> >结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量 >> > >> >casel.chen 于2021年12月23日周四 08:15写道: >> > >> >> flink sql中aggregate without >> >> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql >> >> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游? >> >> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗? >> >> >> >> >> >> 例如有下面binlog cdc购买数据(订单购买金额会更新): >> >> >> >> orderid. categorydt amt >> >> >> >> 订单id 商品类型 购买时间(MMddHH) 购买金额 >> >> >> >> >> >> >> >> >> >> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时 >> >> >> >> >> >> >> >> INSERT INTO mysql_sink_table >> >> >> >> SELECT category, dt, LAST_VALUE(total) >> >> >> >> OVER ( >> >> >> >> PARTITION BY category >> >> >> >> ORDER BY PROCTIME() >> >> >> >> RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW >> >> >> >> ) AS var1 >> >> >> >> FROM ( >> >> >> >> SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt >> >> >> >> ); >> > > >-- > >Best, >Benchao Li
Re: Re: flink sql回撤流sink优化问题
这个问题可以用mini-batch[1]来解决呀 [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-aggregation casel.chen 于2021年12月26日周日 18:01写道: > 你说的是upsert-kafka的这两个参数吗? > > sink.buffer-flush.max-rows > sink.buffer-flush.interval > 确实能达到我想要的效果,但是会引入额外的kafka sink,另外还是从sink > kafka消费再写入mysql,链路有点长,最好是能在原来作业的基础上在sink前添加一个聚合算子。 > > > > > > > > > > > > > > > > > > 在 2021-12-25 22:54:19,"郭伟权" 写道: > > >结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量 > > > >casel.chen 于2021年12月23日周四 08:15写道: > > > >> flink sql中aggregate without > >> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql > >> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游? > >> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗? > >> > >> > >> 例如有下面binlog cdc购买数据(订单购买金额会更新): > >> > >> orderid. categorydt amt > >> > >> 订单id 商品类型 购买时间(MMddHH) 购买金额 > >> > >> > >> > >> > >> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时 > >> > >> > >> > >> INSERT INTO mysql_sink_table > >> > >> SELECT category, dt, LAST_VALUE(total) > >> > >> OVER ( > >> > >> PARTITION BY category > >> > >> ORDER BY PROCTIME() > >> > >> RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW > >> > >> ) AS var1 > >> > >> FROM ( > >> > >> SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt > >> > >> ); > -- Best, Benchao Li
Re:Re: flink sql回撤流sink优化问题
你说的是upsert-kafka的这两个参数吗? sink.buffer-flush.max-rows sink.buffer-flush.interval 确实能达到我想要的效果,但是会引入额外的kafka sink,另外还是从sink kafka消费再写入mysql,链路有点长,最好是能在原来作业的基础上在sink前添加一个聚合算子。 在 2021-12-25 22:54:19,"郭伟权" 写道: >结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量 > >casel.chen 于2021年12月23日周四 08:15写道: > >> flink sql中aggregate without >> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql >> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游? >> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗? >> >> >> 例如有下面binlog cdc购买数据(订单购买金额会更新): >> >> orderid. categorydt amt >> >> 订单id 商品类型 购买时间(MMddHH) 购买金额 >> >> >> >> >> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时 >> >> >> >> INSERT INTO mysql_sink_table >> >> SELECT category, dt, LAST_VALUE(total) >> >> OVER ( >> >> PARTITION BY category >> >> ORDER BY PROCTIME() >> >> RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW >> >> ) AS var1 >> >> FROM ( >> >> SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt >> >> );
Re:Re: flink sql回撤流sink优化问题
jdbc sink的buffer-flush不会减少写入的数据量,只是变成微批写入而已,mysql写入的压力并没有减少。 而我想要实现的效果是会减少写的数据量,因为同一个key的数据被聚合成最后一条。 在 2021-12-26 09:43:47,"Zhiwen Sun" 写道: >不用那么复杂,正常的 insert select group by 即可, 一分钟写一次 mysql 就行。 > >参考 JDBC sink [1] 中的 sink.buffer-flush.interval 和 sink.buffer-flush.max-rows >参数 > >[1] : >https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/jdbc/ > > >Zhiwen Sun > > > >On Thu, Dec 23, 2021 at 8:15 AM casel.chen wrote: > >> flink sql中aggregate without >> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql >> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游? >> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗? >> >> >> 例如有下面binlog cdc购买数据(订单购买金额会更新): >> >> orderid. categorydt amt >> >> 订单id 商品类型 购买时间(MMddHH) 购买金额 >> >> >> >> >> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时 >> >> >> >> INSERT INTO mysql_sink_table >> >> SELECT category, dt, LAST_VALUE(total) >> >> OVER ( >> >> PARTITION BY category >> >> ORDER BY PROCTIME() >> >> RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW >> >> ) AS var1 >> >> FROM ( >> >> SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt >> >> );
Re: flink sql回撤流sink优化问题
不用那么复杂,正常的 insert select group by 即可, 一分钟写一次 mysql 就行。 参考 JDBC sink [1] 中的 sink.buffer-flush.interval 和 sink.buffer-flush.max-rows 参数 [1] : https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/jdbc/ Zhiwen Sun On Thu, Dec 23, 2021 at 8:15 AM casel.chen wrote: > flink sql中aggregate without > window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql > 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游? > 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗? > > > 例如有下面binlog cdc购买数据(订单购买金额会更新): > > orderid. categorydt amt > > 订单id 商品类型 购买时间(MMddHH) 购买金额 > > > > > 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时 > > > > INSERT INTO mysql_sink_table > > SELECT category, dt, LAST_VALUE(total) > > OVER ( > > PARTITION BY category > > ORDER BY PROCTIME() > > RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW > > ) AS var1 > > FROM ( > > SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt > > );
Re: flink sql回撤流sink优化问题
结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量 casel.chen 于2021年12月23日周四 08:15写道: > flink sql中aggregate without > window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql > 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游? > 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗? > > > 例如有下面binlog cdc购买数据(订单购买金额会更新): > > orderid. categorydt amt > > 订单id 商品类型 购买时间(MMddHH) 购买金额 > > > > > 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时 > > > > INSERT INTO mysql_sink_table > > SELECT category, dt, LAST_VALUE(total) > > OVER ( > > PARTITION BY category > > ORDER BY PROCTIME() > > RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW > > ) AS var1 > > FROM ( > > SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt > > );
Re: flink sql回撤流sink优化问题
结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量 casel.chen 于2021年12月23日周四 08:15写道: > flink sql中aggregate without > window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql > 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游? > 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗? > > > 例如有下面binlog cdc购买数据(订单购买金额会更新): > > orderid. categorydt amt > > 订单id 商品类型 购买时间(MMddHH) 购买金额 > > > > > 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时 > > > > INSERT INTO mysql_sink_table > > SELECT category, dt, LAST_VALUE(total) > > OVER ( > > PARTITION BY category > > ORDER BY PROCTIME() > > RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW > > ) AS var1 > > FROM ( > > SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt > > );
flink sql回撤流sink优化问题
flink sql中aggregate without window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游? 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗? 例如有下面binlog cdc购买数据(订单购买金额会更新): orderid. categorydt amt 订单id 商品类型 购买时间(MMddHH) 购买金额 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时 INSERT INTO mysql_sink_table SELECT category, dt, LAST_VALUE(total) OVER ( PARTITION BY category ORDER BY PROCTIME() RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW ) AS var1 FROM ( SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt );