Re: TUMBLE函数不支持 回撤流
@LakeShen。 怎么看是append/retract数据流呢?是通过逻辑自己判定还是说有什么flink层面的信息直接反映。 LakeShen 于2020年11月4日周三 上午10:12写道: > Hi 夜思流年梦, > > 看下你的 dwd_XXX 这张表的类型,是 append 数据流,还是 retract 数据流。 > 如果是 retract ,应该就不能再上面进行窗口计算了。 > > Best, > LakeShen > > 史 正超 于2020年11月3日周二 下午6:34写道: > > > canal-json 的format也是会有delete 和update的数据的,同样changelog-json也是。他们的都支持 INSERT > > UPDATE DELETE, 相关代码如下: > > > > @Override > > public ChangelogMode getChangelogMode() { > >return ChangelogMode.newBuilder() > > .addContainedKind(RowKind.INSERT) > > .addContainedKind(RowKind.UPDATE_BEFORE) > > .addContainedKind(RowKind.UPDATE_AFTER) > > .addContainedKind(RowKind.DELETE) > > .build(); > > } > > > > 所以在window里消费带有update和delete的数据现在应该是不支持的。 > > ____________ > > 发件人: 夜思流年梦 > > 发送时间: 2020年11月3日 9:46 > > 收件人: user-zh@flink.apache.org > > 主题: TUMBLE函数不支持 回撤流 > > > > > > > > > > 这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性; > > > > > > > > > > > > > > > > 原sql > > > > select 0 as id > > > > , HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime > > > > ,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') > > then memberid else NULL end) as paynum_h > > > > ,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, > > '-MM-dd') then real_product else 0 end)) as paymoney_h > > > > from dwd_XXX > > > > where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') > > > > group by TUMBLE(proctime ,interval '1' HOUR); > > > > > > 报错: > > org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't > > support consuming update and delete changes which is produced by node > > TableSourceScan > > 发现把kafka建表语句改成 json格式就可以 > > > > > > 数据源不是flink-mysql-cdc得来的 > > > > > > 是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表), > > > > > > 'connector' = 'kafka', > > 'properties.group.id' = 'XX', > > 'properties.bootstrap.servers' = 'XX', > > 'topic' = 'ODS_XXX', > > 'scan.startup.mode' = 'group-offsets', > > 'format' = 'canal-json'); > > > > > > 上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的, > > 建kafka表的格式,使用的changelog-json: > > > > > > WITH ( > > 'connector' = 'kafka', > > 'properties.group.id' = 'XX', > > 'properties.bootstrap.servers' = 'XXX', > > 'topic' = 'DWD_XXX', > > 'scan.startup.mode' = 'group-offsets', > > 'format' = 'changelog-json'); > > > > > > > > > > > > > > > > > > > > > > > > 在 2020-10-30 14:53:09,"admin" <17626017...@163.com> 写道: > > >Hi, > > >能贴一下完整的sql吗,数据源是CDC的数据吗? > > > > > >> 2020年10月30日 下午2:48,夜思流年梦 写道: > > >> > > >> 开发者你好: > > >> 现有此场景: > > >> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流 > > >> select > > >> > > >>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime > > >> > > >>> ,sum(amt) as paymoney_h > > >> > > >>> from > > >> > > >>> group by TUMBLE(write_time,interval '1' HOUR); > > >> > > >> > > >> 报错: > > >> org.apache.flink.table.api.TableException: GroupWindowAggregate > doesn't > > support consuming update and delete changes which is produced by node > > TableSourceScan > > >> > > >> > > >> > > >> > > >> 发现把kafka建表语句改成 json格式就可以 > > >> > > >> > > >> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊 > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > > > > > > > > > > > > > >
Re: TUMBLE函数不支持 回撤流
Hi 夜思流年梦, 看下你的 dwd_XXX 这张表的类型,是 append 数据流,还是 retract 数据流。 如果是 retract ,应该就不能再上面进行窗口计算了。 Best, LakeShen 史 正超 于2020年11月3日周二 下午6:34写道: > canal-json 的format也是会有delete 和update的数据的,同样changelog-json也是。他们的都支持 INSERT > UPDATE DELETE, 相关代码如下: > > @Override > public ChangelogMode getChangelogMode() { >return ChangelogMode.newBuilder() > .addContainedKind(RowKind.INSERT) > .addContainedKind(RowKind.UPDATE_BEFORE) > .addContainedKind(RowKind.UPDATE_AFTER) > .addContainedKind(RowKind.DELETE) > .build(); > } > > 所以在window里消费带有update和delete的数据现在应该是不支持的。 > > 发件人: 夜思流年梦 > 发送时间: 2020年11月3日 9:46 > 收件人: user-zh@flink.apache.org > 主题: TUMBLE函数不支持 回撤流 > > > > > 这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性; > > > > > > > > 原sql > > select 0 as id > > , HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime > > ,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') > then memberid else NULL end) as paynum_h > > ,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, > '-MM-dd') then real_product else 0 end)) as paymoney_h > > from dwd_XXX > > where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') > > group by TUMBLE(proctime ,interval '1' HOUR); > > > 报错: > org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't > support consuming update and delete changes which is produced by node > TableSourceScan > 发现把kafka建表语句改成 json格式就可以 > > > 数据源不是flink-mysql-cdc得来的 > > > 是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表), > > > 'connector' = 'kafka', > 'properties.group.id' = 'XX', > 'properties.bootstrap.servers' = 'XX', > 'topic' = 'ODS_XXX', > 'scan.startup.mode' = 'group-offsets', > 'format' = 'canal-json'); > > > 上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的, > 建kafka表的格式,使用的changelog-json: > > > WITH ( > 'connector' = 'kafka', > 'properties.group.id' = 'XX', > 'properties.bootstrap.servers' = 'XXX', > 'topic' = 'DWD_XXX', > 'scan.startup.mode' = 'group-offsets', > 'format' = 'changelog-json'); > > > > > > > > > > > > 在 2020-10-30 14:53:09,"admin" <17626017...@163.com> 写道: > >Hi, > >能贴一下完整的sql吗,数据源是CDC的数据吗? > > > >> 2020年10月30日 下午2:48,夜思流年梦 写道: > >> > >> 开发者你好: > >> 现有此场景: > >> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流 > >> select > >> > >>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime > >> > >>> ,sum(amt) as paymoney_h > >> > >>> from > >> > >>> group by TUMBLE(write_time,interval '1' HOUR); > >> > >> > >> 报错: > >> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't > support consuming update and delete changes which is produced by node > TableSourceScan > >> > >> > >> > >> > >> 发现把kafka建表语句改成 json格式就可以 > >> > >> > >> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊 > >> > >> > >> > >> > >> > >> > >> > >> > >> > > > > > > >
回复: TUMBLE函数不支持 回撤流
canal-json 的format也是会有delete 和update的数据的,同样changelog-json也是。他们的都支持 INSERT UPDATE DELETE, 相关代码如下: @Override public ChangelogMode getChangelogMode() { return ChangelogMode.newBuilder() .addContainedKind(RowKind.INSERT) .addContainedKind(RowKind.UPDATE_BEFORE) .addContainedKind(RowKind.UPDATE_AFTER) .addContainedKind(RowKind.DELETE) .build(); } 所以在window里消费带有update和delete的数据现在应该是不支持的。 发件人: 夜思流年梦 发送时间: 2020年11月3日 9:46 收件人: user-zh@flink.apache.org 主题: TUMBLE函数不支持 回撤流 这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性; 原sql select 0 as id , HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime ,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') then memberid else NULL end) as paynum_h ,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') then real_product else 0 end)) as paymoney_h from dwd_XXX where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') group by TUMBLE(proctime ,interval '1' HOUR); 报错: org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan 发现把kafka建表语句改成 json格式就可以 数据源不是flink-mysql-cdc得来的 是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表), 'connector' = 'kafka', 'properties.group.id' = 'XX', 'properties.bootstrap.servers' = 'XX', 'topic' = 'ODS_XXX', 'scan.startup.mode' = 'group-offsets', 'format' = 'canal-json'); 上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的, 建kafka表的格式,使用的changelog-json: WITH ( 'connector' = 'kafka', 'properties.group.id' = 'XX', 'properties.bootstrap.servers' = 'XXX', 'topic' = 'DWD_XXX', 'scan.startup.mode' = 'group-offsets', 'format' = 'changelog-json'); 在 2020-10-30 14:53:09,"admin" <17626017...@163.com> 写道: >Hi, >能贴一下完整的sql吗,数据源是CDC的数据吗? > >> 2020年10月30日 下午2:48,夜思流年梦 写道: >> >> 开发者你好: >> 现有此场景: >> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流 >> select >> >>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime >> >>> ,sum(amt) as paymoney_h >> >>> from >> >>> group by TUMBLE(write_time,interval '1' HOUR); >> >> >> 报错: >> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't >> support consuming update and delete changes which is produced by node >> TableSourceScan >> >> >> >> >> 发现把kafka建表语句改成 json格式就可以 >> >> >> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊 >> >> >> >> >> >> >> >> >>
TUMBLE函数不支持 回撤流
这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性; 原sql select 0 as id , HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime ,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') then memberid else NULL end) as paynum_h ,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') then real_product else 0 end)) as paymoney_h from dwd_XXX where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') group by TUMBLE(proctime ,interval '1' HOUR); 报错: org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan 发现把kafka建表语句改成 json格式就可以 数据源不是flink-mysql-cdc得来的 是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表), 'connector' = 'kafka', 'properties.group.id' = 'XX', 'properties.bootstrap.servers' = 'XX', 'topic' = 'ODS_XXX', 'scan.startup.mode' = 'group-offsets', 'format' = 'canal-json'); 上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的, 建kafka表的格式,使用的changelog-json: WITH ( 'connector' = 'kafka', 'properties.group.id' = 'XX', 'properties.bootstrap.servers' = 'XXX', 'topic' = 'DWD_XXX', 'scan.startup.mode' = 'group-offsets', 'format' = 'changelog-json'); 在 2020-10-30 14:53:09,"admin" <17626017...@163.com> 写道: >Hi, >能贴一下完整的sql吗,数据源是CDC的数据吗? > >> 2020年10月30日 下午2:48,夜思流年梦 写道: >> >> 开发者你好: >> 现有此场景: >> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流 >> select >> >>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime >> >>> ,sum(amt) as paymoney_h >> >>> from >> >>> group by TUMBLE(write_time,interval '1' HOUR); >> >> >> 报错: >> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't >> support consuming update and delete changes which is produced by node >> TableSourceScan >> >> >> >> >> 发现把kafka建表语句改成 json格式就可以 >> >> >> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊 >> >> >> >> >> >> >> >> >>
Re: TUMBLE函数不支持 回撤流
退订 Regards Alex Wang | BigData Architect Email: wanglei...@163.com > 2020年10月30日 15:12,夜思流年梦 写道: > > 原sql > > select 0 as id > > , HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime > > ,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') then > memberid else NULL end) as paynum_h > > ,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') > then real_product else 0 end)) as paymoney_h > > from dwd_XXX > > where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') > > group by TUMBLE(proctime ,interval '1' HOUR); > > > 数据源不是flink-mysql-cdc得来的 > > > 是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表), > > > 'connector' = 'kafka', > 'properties.group.id' = 'XX', > 'properties.bootstrap.servers' = 'XX', > 'topic' = 'ODS_XXX', > 'scan.startup.mode' = 'group-offsets', > 'format' = 'canal-json'); > > > 上面这个dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的, > 建kafka表的格式,使用的changelog-json: > > > WITH ( > 'connector' = 'kafka', > 'properties.group.id' = 'XX', > 'properties.bootstrap.servers' = 'XXX', > 'topic' = 'DWD_XXX', > 'scan.startup.mode' = 'group-offsets', > 'format' = 'changelog-json'); > > > > > > > > > > > > 在 2020-10-30 14:53:09,"admin" <17626017...@163.com> 写道: >> Hi, >> 能贴一下完整的sql吗,数据源是CDC的数据吗? >> >>> 2020年10月30日 下午2:48,夜思流年梦 写道: >>> >>> 开发者你好: >>> 现有此场景: >>> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流 >>> select >>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime >>> ,sum(amt) as paymoney_h >>> from >>> group by TUMBLE(write_time,interval '1' HOUR); >>> >>> >>> 报错: >>> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't >>> support consuming update and delete changes which is produced by node >>> TableSourceScan >>> >>> >>> >>> >>> 发现把kafka建表语句改成 json格式就可以 >>> >>> >>> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊 >>> >>> >>> >>> >>> >>> >>> >>> >>>
Re:Re: TUMBLE函数不支持 回撤流
原sql select 0 as id , HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime ,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') then memberid else NULL end) as paynum_h ,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') then real_product else 0 end)) as paymoney_h from dwd_XXX where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') group by TUMBLE(proctime ,interval '1' HOUR); 数据源不是flink-mysql-cdc得来的 是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表), 'connector' = 'kafka', 'properties.group.id' = 'XX', 'properties.bootstrap.servers' = 'XX', 'topic' = 'ODS_XXX', 'scan.startup.mode' = 'group-offsets', 'format' = 'canal-json'); 上面这个dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的, 建kafka表的格式,使用的changelog-json: WITH ( 'connector' = 'kafka', 'properties.group.id' = 'XX', 'properties.bootstrap.servers' = 'XXX', 'topic' = 'DWD_XXX', 'scan.startup.mode' = 'group-offsets', 'format' = 'changelog-json'); 在 2020-10-30 14:53:09,"admin" <17626017...@163.com> 写道: >Hi, >能贴一下完整的sql吗,数据源是CDC的数据吗? > >> 2020年10月30日 下午2:48,夜思流年梦 写道: >> >> 开发者你好: >> 现有此场景: >> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流 >> select >> >>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime >> >>> ,sum(amt) as paymoney_h >> >>> from >> >>> group by TUMBLE(write_time,interval '1' HOUR); >> >> >> 报错: >> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't >> support consuming update and delete changes which is produced by node >> TableSourceScan >> >> >> >> >> 发现把kafka建表语句改成 json格式就可以 >> >> >> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊 >> >> >> >> >> >> >> >> >>
Re: TUMBLE函数不支持 回撤流
Hi, 能贴一下完整的sql吗,数据源是CDC的数据吗? > 2020年10月30日 下午2:48,夜思流年梦 写道: > > 开发者你好: > 现有此场景: > 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流 > select > >> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime > >> ,sum(amt) as paymoney_h > >> from > >> group by TUMBLE(write_time,interval '1' HOUR); > > > 报错: > org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't > support consuming update and delete changes which is produced by node > TableSourceScan > > > > > 发现把kafka建表语句改成 json格式就可以 > > > 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊 > > > > > > > > >
TUMBLE函数不支持 回撤流
开发者你好: 现有此场景: 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流 select > HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime > ,sum(amt) as paymoney_h > from > group by TUMBLE(write_time,interval '1' HOUR); 报错: org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan 发现把kafka建表语句改成 json格式就可以 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊