Re: TUMBLE函数不支持 回撤流

2020-11-22 文章 赵一旦
@LakeShen。 怎么看是append/retract数据流呢?是通过逻辑自己判定还是说有什么flink层面的信息直接反映。

LakeShen  于2020年11月4日周三 上午10:12写道:

> Hi 夜思流年梦,
>
> 看下你的 dwd_XXX 这张表的类型,是 append 数据流,还是 retract 数据流。
> 如果是 retract ,应该就不能再上面进行窗口计算了。
>
> Best,
> LakeShen
>
> 史 正超  于2020年11月3日周二 下午6:34写道:
>
> > canal-json 的format也是会有delete 和update的数据的,同样changelog-json也是。他们的都支持 INSERT
> > UPDATE DELETE, 相关代码如下:
> >
> > @Override
> > public ChangelogMode getChangelogMode() {
> >return ChangelogMode.newBuilder()
> >   .addContainedKind(RowKind.INSERT)
> >   .addContainedKind(RowKind.UPDATE_BEFORE)
> >   .addContainedKind(RowKind.UPDATE_AFTER)
> >   .addContainedKind(RowKind.DELETE)
> >   .build();
> > }
> >
> > 所以在window里消费带有update和delete的数据现在应该是不支持的。
> > ____________
> > 发件人: 夜思流年梦 
> > 发送时间: 2020年11月3日 9:46
> > 收件人: user-zh@flink.apache.org 
> > 主题: TUMBLE函数不支持 回撤流
> >
> >
> >
> >
> > 这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性;
> >
> >
> >
> >
> >
> >
> >
> > 原sql
> >
> > select 0 as id
> >
> > , HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime
> >
> > ,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')
> > then memberid else NULL end) as paynum_h
> >
> > ,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP,
> > '-MM-dd')  then real_product else 0 end)) as paymoney_h
> >
> > from dwd_XXX
> >
> > where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')
> >
> > group by TUMBLE(proctime ,interval '1' HOUR);
> >
> >
> > 报错:
> >  org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't
> > support consuming update and delete changes which is produced by node
> > TableSourceScan
> > 发现把kafka建表语句改成 json格式就可以
> >
> >
> > 数据源不是flink-mysql-cdc得来的
> >
> >
> > 是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表),
> >
> >
> >  'connector' = 'kafka',
> >   'properties.group.id' = 'XX',
> >   'properties.bootstrap.servers' = 'XX',
> >   'topic' = 'ODS_XXX',
> >   'scan.startup.mode' = 'group-offsets',
> >   'format' = 'canal-json');
> >
> >
> > 上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的,
> > 建kafka表的格式,使用的changelog-json:
> >
> >
> > WITH (
> >   'connector' = 'kafka',
> >   'properties.group.id' = 'XX',
> >   'properties.bootstrap.servers' = 'XXX',
> >   'topic' = 'DWD_XXX',
> >   'scan.startup.mode' = 'group-offsets',
> >   'format' = 'changelog-json');
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2020-10-30 14:53:09,"admin" <17626017...@163.com> 写道:
> > >Hi,
> > >能贴一下完整的sql吗,数据源是CDC的数据吗?
> > >
> > >> 2020年10月30日 下午2:48,夜思流年梦  写道:
> > >>
> > >> 开发者你好:
> > >> 现有此场景:
> > >> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
> > >> select
> > >>
> > >>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
> > >>
> > >>> ,sum(amt) as paymoney_h
> > >>
> > >>> from 
> > >>
> > >>> group by TUMBLE(write_time,interval '1' HOUR);
> > >>
> > >>
> > >> 报错:
> > >> org.apache.flink.table.api.TableException: GroupWindowAggregate
> doesn't
> > support consuming update and delete changes which is produced by node
> > TableSourceScan
> > >>
> > >>
> > >>
> > >>
> > >> 发现把kafka建表语句改成 json格式就可以
> > >>
> > >>
> > >> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> >
> >
> >
> >
> >
> >
> >
>


Re: TUMBLE函数不支持 回撤流

2020-11-03 文章 LakeShen
Hi 夜思流年梦,

看下你的 dwd_XXX 这张表的类型,是 append 数据流,还是 retract 数据流。
如果是 retract ,应该就不能再上面进行窗口计算了。

Best,
LakeShen

史 正超  于2020年11月3日周二 下午6:34写道:

> canal-json 的format也是会有delete 和update的数据的,同样changelog-json也是。他们的都支持 INSERT
> UPDATE DELETE, 相关代码如下:
>
> @Override
> public ChangelogMode getChangelogMode() {
>return ChangelogMode.newBuilder()
>   .addContainedKind(RowKind.INSERT)
>   .addContainedKind(RowKind.UPDATE_BEFORE)
>   .addContainedKind(RowKind.UPDATE_AFTER)
>   .addContainedKind(RowKind.DELETE)
>   .build();
> }
>
> 所以在window里消费带有update和delete的数据现在应该是不支持的。
> 
> 发件人: 夜思流年梦 
> 发送时间: 2020年11月3日 9:46
> 收件人: user-zh@flink.apache.org 
> 主题: TUMBLE函数不支持 回撤流
>
>
>
>
> 这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性;
>
>
>
>
>
>
>
> 原sql
>
> select 0 as id
>
> , HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime
>
> ,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')
> then memberid else NULL end) as paynum_h
>
> ,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP,
> '-MM-dd')  then real_product else 0 end)) as paymoney_h
>
> from dwd_XXX
>
> where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')
>
> group by TUMBLE(proctime ,interval '1' HOUR);
>
>
> 报错:
>  org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't
> support consuming update and delete changes which is produced by node
> TableSourceScan
> 发现把kafka建表语句改成 json格式就可以
>
>
> 数据源不是flink-mysql-cdc得来的
>
>
> 是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表),
>
>
>  'connector' = 'kafka',
>   'properties.group.id' = 'XX',
>   'properties.bootstrap.servers' = 'XX',
>   'topic' = 'ODS_XXX',
>   'scan.startup.mode' = 'group-offsets',
>   'format' = 'canal-json');
>
>
> 上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的,
> 建kafka表的格式,使用的changelog-json:
>
>
> WITH (
>   'connector' = 'kafka',
>   'properties.group.id' = 'XX',
>   'properties.bootstrap.servers' = 'XXX',
>   'topic' = 'DWD_XXX',
>   'scan.startup.mode' = 'group-offsets',
>   'format' = 'changelog-json');
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-10-30 14:53:09,"admin" <17626017...@163.com> 写道:
> >Hi,
> >能贴一下完整的sql吗,数据源是CDC的数据吗?
> >
> >> 2020年10月30日 下午2:48,夜思流年梦  写道:
> >>
> >> 开发者你好:
> >> 现有此场景:
> >> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
> >> select
> >>
> >>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
> >>
> >>> ,sum(amt) as paymoney_h
> >>
> >>> from 
> >>
> >>> group by TUMBLE(write_time,interval '1' HOUR);
> >>
> >>
> >> 报错:
> >> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't
> support consuming update and delete changes which is produced by node
> TableSourceScan
> >>
> >>
> >>
> >>
> >> 发现把kafka建表语句改成 json格式就可以
> >>
> >>
> >> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
>
>
>
>
>
>
>


回复: TUMBLE函数不支持 回撤流

2020-11-03 文章 史 正超
canal-json 的format也是会有delete 和update的数据的,同样changelog-json也是。他们的都支持 INSERT 
UPDATE DELETE, 相关代码如下:

@Override
public ChangelogMode getChangelogMode() {
   return ChangelogMode.newBuilder()
  .addContainedKind(RowKind.INSERT)
  .addContainedKind(RowKind.UPDATE_BEFORE)
  .addContainedKind(RowKind.UPDATE_AFTER)
  .addContainedKind(RowKind.DELETE)
  .build();
}

所以在window里消费带有update和delete的数据现在应该是不支持的。

发件人: 夜思流年梦 
发送时间: 2020年11月3日 9:46
收件人: user-zh@flink.apache.org 
主题: TUMBLE函数不支持 回撤流




这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性;







原sql

select 0 as id

, HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime

,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') then 
memberid else NULL end) as paynum_h

,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')  
then real_product else 0 end)) as paymoney_h

from dwd_XXX

where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')

group by TUMBLE(proctime ,interval '1' HOUR);


报错:
 org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't 
support consuming update and delete changes which is produced by node 
TableSourceScan
发现把kafka建表语句改成 json格式就可以


数据源不是flink-mysql-cdc得来的


是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表),


 'connector' = 'kafka',
  'properties.group.id' = 'XX',
  'properties.bootstrap.servers' = 'XX',
  'topic' = 'ODS_XXX',
  'scan.startup.mode' = 'group-offsets',
  'format' = 'canal-json');


上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的,
建kafka表的格式,使用的changelog-json:


WITH (
  'connector' = 'kafka',
  'properties.group.id' = 'XX',
  'properties.bootstrap.servers' = 'XXX',
  'topic' = 'DWD_XXX',
  'scan.startup.mode' = 'group-offsets',
  'format' = 'changelog-json');











在 2020-10-30 14:53:09,"admin" <17626017...@163.com> 写道:
>Hi,
>能贴一下完整的sql吗,数据源是CDC的数据吗?
>
>> 2020年10月30日 下午2:48,夜思流年梦  写道:
>>
>> 开发者你好:
>> 现有此场景:
>> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
>> select
>>
>>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
>>
>>> ,sum(amt) as paymoney_h
>>
>>> from 
>>
>>> group by TUMBLE(write_time,interval '1' HOUR);
>>
>>
>> 报错:
>> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't 
>> support consuming update and delete changes which is produced by node 
>> TableSourceScan
>>
>>
>>
>>
>> 发现把kafka建表语句改成 json格式就可以
>>
>>
>> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
>>
>>
>>
>>
>>
>>
>>
>>
>>








TUMBLE函数不支持 回撤流

2020-11-03 文章 夜思流年梦



这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性;







原sql 

select 0 as id

, HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime

,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') then 
memberid else NULL end) as paynum_h 

,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')  
then real_product else 0 end)) as paymoney_h  

from dwd_XXX

where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')

group by TUMBLE(proctime ,interval '1' HOUR);


报错:
 org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't 
support consuming update and delete changes which is produced by node 
TableSourceScan
发现把kafka建表语句改成 json格式就可以


数据源不是flink-mysql-cdc得来的


是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表),


 'connector' = 'kafka',
  'properties.group.id' = 'XX',
  'properties.bootstrap.servers' = 'XX',
  'topic' = 'ODS_XXX',  
  'scan.startup.mode' = 'group-offsets',
  'format' = 'canal-json');


上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的,
建kafka表的格式,使用的changelog-json:


WITH (
  'connector' = 'kafka',
  'properties.group.id' = 'XX',
  'properties.bootstrap.servers' = 'XXX',
  'topic' = 'DWD_XXX',  
  'scan.startup.mode' = 'group-offsets',
  'format' = 'changelog-json');  











在 2020-10-30 14:53:09,"admin" <17626017...@163.com> 写道:
>Hi,
>能贴一下完整的sql吗,数据源是CDC的数据吗?
>
>> 2020年10月30日 下午2:48,夜思流年梦  写道:
>> 
>> 开发者你好:
>> 现有此场景:
>> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
>> select 
>> 
>>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
>> 
>>> ,sum(amt) as paymoney_h  
>> 
>>> from 
>> 
>>> group by TUMBLE(write_time,interval '1' HOUR);
>> 
>> 
>> 报错:
>> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't 
>> support consuming update and delete changes which is produced by node 
>> TableSourceScan
>> 
>> 
>> 
>> 
>> 发现把kafka建表语句改成 json格式就可以
>> 
>> 
>> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 





 

Re: TUMBLE函数不支持 回撤流

2020-10-30 文章 wanglei...@163.com
退订


Regards
Alex Wang | BigData Architect
Email: wanglei...@163.com

> 2020年10月30日 15:12,夜思流年梦  写道:
> 
> 原sql
> 
> select 0 as id
> 
> , HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime
> 
> ,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') then 
> memberid else NULL end) as paynum_h 
> 
> ,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')  
> then real_product else 0 end)) as paymoney_h  
> 
> from dwd_XXX
> 
> where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')
> 
> group by TUMBLE(proctime ,interval '1' HOUR);
> 
> 
> 数据源不是flink-mysql-cdc得来的
> 
> 
> 是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表),
> 
> 
> 'connector' = 'kafka',
>  'properties.group.id' = 'XX',
>  'properties.bootstrap.servers' = 'XX',
>  'topic' = 'ODS_XXX',  
>  'scan.startup.mode' = 'group-offsets',
>  'format' = 'canal-json');
> 
> 
> 上面这个dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的,
> 建kafka表的格式,使用的changelog-json:
> 
> 
> WITH (
>  'connector' = 'kafka',
>  'properties.group.id' = 'XX',
>  'properties.bootstrap.servers' = 'XXX',
>  'topic' = 'DWD_XXX',  
>  'scan.startup.mode' = 'group-offsets',
>  'format' = 'changelog-json');  
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-10-30 14:53:09,"admin" <17626017...@163.com> 写道:
>> Hi,
>> 能贴一下完整的sql吗,数据源是CDC的数据吗?
>> 
>>> 2020年10月30日 下午2:48,夜思流年梦  写道:
>>> 
>>> 开发者你好:
>>> 现有此场景:
>>> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
>>> select 
>>> 
 HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
>>> 
 ,sum(amt) as paymoney_h  
>>> 
 from 
>>> 
 group by TUMBLE(write_time,interval '1' HOUR);
>>> 
>>> 
>>> 报错:
>>> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't 
>>> support consuming update and delete changes which is produced by node 
>>> TableSourceScan
>>> 
>>> 
>>> 
>>> 
>>> 发现把kafka建表语句改成 json格式就可以
>>> 
>>> 
>>> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 



Re:Re: TUMBLE函数不支持 回撤流

2020-10-30 文章 夜思流年梦
原sql

select 0 as id

, HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime

,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') then 
memberid else NULL end) as paynum_h 

,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')  
then real_product else 0 end)) as paymoney_h  

from dwd_XXX

where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')

group by TUMBLE(proctime ,interval '1' HOUR);


数据源不是flink-mysql-cdc得来的


是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表),


 'connector' = 'kafka',
  'properties.group.id' = 'XX',
  'properties.bootstrap.servers' = 'XX',
  'topic' = 'ODS_XXX',  
  'scan.startup.mode' = 'group-offsets',
  'format' = 'canal-json');


上面这个dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的,
建kafka表的格式,使用的changelog-json:


WITH (
  'connector' = 'kafka',
  'properties.group.id' = 'XX',
  'properties.bootstrap.servers' = 'XXX',
  'topic' = 'DWD_XXX',  
  'scan.startup.mode' = 'group-offsets',
  'format' = 'changelog-json');  











在 2020-10-30 14:53:09,"admin" <17626017...@163.com> 写道:
>Hi,
>能贴一下完整的sql吗,数据源是CDC的数据吗?
>
>> 2020年10月30日 下午2:48,夜思流年梦  写道:
>> 
>> 开发者你好:
>> 现有此场景:
>> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
>> select 
>> 
>>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
>> 
>>> ,sum(amt) as paymoney_h  
>> 
>>> from 
>> 
>>> group by TUMBLE(write_time,interval '1' HOUR);
>> 
>> 
>> 报错:
>> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't 
>> support consuming update and delete changes which is produced by node 
>> TableSourceScan
>> 
>> 
>> 
>> 
>> 发现把kafka建表语句改成 json格式就可以
>> 
>> 
>> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 


Re: TUMBLE函数不支持 回撤流

2020-10-30 文章 admin
Hi,
能贴一下完整的sql吗,数据源是CDC的数据吗?

> 2020年10月30日 下午2:48,夜思流年梦  写道:
> 
> 开发者你好:
> 现有此场景:
> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
> select 
> 
>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
> 
>> ,sum(amt) as paymoney_h  
> 
>> from 
> 
>> group by TUMBLE(write_time,interval '1' HOUR);
> 
> 
> 报错:
> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't 
> support consuming update and delete changes which is produced by node 
> TableSourceScan
> 
> 
> 
> 
> 发现把kafka建表语句改成 json格式就可以
> 
> 
> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
> 
> 
> 
> 
> 
> 
> 
> 
> 



TUMBLE函数不支持 回撤流

2020-10-30 文章 夜思流年梦
开发者你好:
现有此场景:
求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
select 

> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime

> ,sum(amt) as paymoney_h  

> from 

> group by TUMBLE(write_time,interval '1' HOUR);


报错:
org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support 
consuming update and delete changes which is produced by node TableSourceScan




发现把kafka建表语句改成 json格式就可以


现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊