Re: Re: Re: FLINK SQL view的数据复用问题

2020-08-05 文章 godfrey he
目前sql-client还不支持。关于纯SQL文本statement set的支持,
目前社区已经达成语法的一致意见,应该后续会慢慢的支持。

kandy.wang  于2020年8月5日周三 下午10:43写道:

>
>
>
>
>
>
> @ godfrey
> 你说的这种StatementSet 提交方式,在sql-client提交任务的时候不支持吧? 可以给加上么。
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-08-04 19:36:56,"godfrey he"  写道:
> >调用 StatementSet#explain() 把结果打出来看看是否因 Deduplicate的digest不一样导致的没法复用
> >
> >kandy.wang  于2020年8月4日周二 下午6:21写道:
> >
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> @ godfrey
> >> thanks。刚试了一下,source -> Deduplicate  ->
> >> GlobalGroupAggregate,在souce端确实是复用了。但是Deduplicate 端是没有复用呢?理论上source +
> >> Deduplicate 都是view里的逻辑,都应该复用才对。就是感觉复用的还不够多呢。
> >>
> >>
> >> 在 2020-08-04 17:26:02,"godfrey he"  写道:
> >> >blink planner支持将多sink的query优化成尽量复用重复计算部分。
> >> >1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务
> >> >
> >> >kandy.wang  于2020年8月4日周二 下午5:20写道:
> >> >
> >> >> FLINK SQL view相关问题:
> >> >> create view order_source
> >> >>
> >> >> as
> >> >>
> >> >> select order_id, order_goods_id, user_id,...
> >> >>
> >> >> from (
> >> >>
> >> >> ..  proctime,row_number() over(partition by order_id,
> >> >> order_goods_id order by proctime desc) as rownum
> >> >>
> >> >> from hive.temp_dw.dm_trd_order_goods/*+ OPTIONS('
> >> properties.group.id'='flink_etl_kafka_hbase',
> >> >> 'scan.startup.mode'='latest-offset') */
> >> >>
> >> >> ) where  rownum = 1 and  price > 0;
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> insert into hive.temp_dw.day_order_index select rowkey,
> ROW(cast(saleN
> >> as
> >> >> BIGINT),)
> >> >>
> >> >> from
> >> >>
> >> >> (
> >> >>
> >> >> select order_date as rowkey,
> >> >>
> >> >> sum(amount) as saleN,
> >> >>
> >> >> from order_source
> >> >>
> >> >> group by order_date
> >> >>
> >> >> );
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> insert into hive.temp_dw.day_order_index select rowkey,
> ROW(cast(saleN
> >> as
> >> >> BIGINT))
> >> >>
> >> >> from
> >> >>
> >> >> (
> >> >>
> >> >> select order_hour as rowkey,sum(amount) as saleN,
> >> >>
> >> >>
> >> >>
> >> >> from order_source
> >> >>
> >> >> group by order_hour
> >> >>
> >> >> );
> >> >> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer
> group。
> >> >> 最后生成的job是 : a.  order_source  -> sink  1  b.  order_source  ->
> sink
> >> >> 2
> >> >>
> >> >>
> >> >> 本意是想通过view  order_source
> >> >> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ?
> >> >>
> >> >>
> >>
>


Re:Re: Re: FLINK SQL view的数据复用问题

2020-08-05 文章 kandy.wang






@ godfrey
你说的这种StatementSet 提交方式,在sql-client提交任务的时候不支持吧? 可以给加上么。











在 2020-08-04 19:36:56,"godfrey he"  写道:
>调用 StatementSet#explain() 把结果打出来看看是否因 Deduplicate的digest不一样导致的没法复用
>
>kandy.wang  于2020年8月4日周二 下午6:21写道:
>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> @ godfrey
>> thanks。刚试了一下,source -> Deduplicate  ->
>> GlobalGroupAggregate,在souce端确实是复用了。但是Deduplicate 端是没有复用呢?理论上source +
>> Deduplicate 都是view里的逻辑,都应该复用才对。就是感觉复用的还不够多呢。
>>
>>
>> 在 2020-08-04 17:26:02,"godfrey he"  写道:
>> >blink planner支持将多sink的query优化成尽量复用重复计算部分。
>> >1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务
>> >
>> >kandy.wang  于2020年8月4日周二 下午5:20写道:
>> >
>> >> FLINK SQL view相关问题:
>> >> create view order_source
>> >>
>> >> as
>> >>
>> >> select order_id, order_goods_id, user_id,...
>> >>
>> >> from (
>> >>
>> >> ..  proctime,row_number() over(partition by order_id,
>> >> order_goods_id order by proctime desc) as rownum
>> >>
>> >> from hive.temp_dw.dm_trd_order_goods/*+ OPTIONS('
>> properties.group.id'='flink_etl_kafka_hbase',
>> >> 'scan.startup.mode'='latest-offset') */
>> >>
>> >> ) where  rownum = 1 and  price > 0;
>> >>
>> >>
>> >>
>> >>
>> >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN
>> as
>> >> BIGINT),)
>> >>
>> >> from
>> >>
>> >> (
>> >>
>> >> select order_date as rowkey,
>> >>
>> >> sum(amount) as saleN,
>> >>
>> >> from order_source
>> >>
>> >> group by order_date
>> >>
>> >> );
>> >>
>> >>
>> >>
>> >>
>> >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN
>> as
>> >> BIGINT))
>> >>
>> >> from
>> >>
>> >> (
>> >>
>> >> select order_hour as rowkey,sum(amount) as saleN,
>> >>
>> >>
>> >>
>> >> from order_source
>> >>
>> >> group by order_hour
>> >>
>> >> );
>> >> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。
>> >> 最后生成的job是 : a.  order_source  -> sink  1  b.  order_source  -> sink
>> >> 2
>> >>
>> >>
>> >> 本意是想通过view  order_source
>> >> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ?
>> >>
>> >>
>>


Re: Re: FLINK SQL view的数据复用问题

2020-08-04 文章 godfrey he
调用 StatementSet#explain() 把结果打出来看看是否因 Deduplicate的digest不一样导致的没法复用

kandy.wang  于2020年8月4日周二 下午6:21写道:

>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> @ godfrey
> thanks。刚试了一下,source -> Deduplicate  ->
> GlobalGroupAggregate,在souce端确实是复用了。但是Deduplicate 端是没有复用呢?理论上source +
> Deduplicate 都是view里的逻辑,都应该复用才对。就是感觉复用的还不够多呢。
>
>
> 在 2020-08-04 17:26:02,"godfrey he"  写道:
> >blink planner支持将多sink的query优化成尽量复用重复计算部分。
> >1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务
> >
> >kandy.wang  于2020年8月4日周二 下午5:20写道:
> >
> >> FLINK SQL view相关问题:
> >> create view order_source
> >>
> >> as
> >>
> >> select order_id, order_goods_id, user_id,...
> >>
> >> from (
> >>
> >> ..  proctime,row_number() over(partition by order_id,
> >> order_goods_id order by proctime desc) as rownum
> >>
> >> from hive.temp_dw.dm_trd_order_goods/*+ OPTIONS('
> properties.group.id'='flink_etl_kafka_hbase',
> >> 'scan.startup.mode'='latest-offset') */
> >>
> >> ) where  rownum = 1 and  price > 0;
> >>
> >>
> >>
> >>
> >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN
> as
> >> BIGINT),)
> >>
> >> from
> >>
> >> (
> >>
> >> select order_date as rowkey,
> >>
> >> sum(amount) as saleN,
> >>
> >> from order_source
> >>
> >> group by order_date
> >>
> >> );
> >>
> >>
> >>
> >>
> >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN
> as
> >> BIGINT))
> >>
> >> from
> >>
> >> (
> >>
> >> select order_hour as rowkey,sum(amount) as saleN,
> >>
> >>
> >>
> >> from order_source
> >>
> >> group by order_hour
> >>
> >> );
> >> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。
> >> 最后生成的job是 : a.  order_source  -> sink  1  b.  order_source  -> sink
> >> 2
> >>
> >>
> >> 本意是想通过view  order_source
> >> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ?
> >>
> >>
>


Re:Re: FLINK SQL view的数据复用问题

2020-08-04 文章 kandy.wang















@ godfrey
thanks。刚试了一下,source -> Deduplicate  -> 
GlobalGroupAggregate,在souce端确实是复用了。但是Deduplicate 端是没有复用呢?理论上source + 
Deduplicate 都是view里的逻辑,都应该复用才对。就是感觉复用的还不够多呢。
 

在 2020-08-04 17:26:02,"godfrey he"  写道:
>blink planner支持将多sink的query优化成尽量复用重复计算部分。
>1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务
>
>kandy.wang  于2020年8月4日周二 下午5:20写道:
>
>> FLINK SQL view相关问题:
>> create view order_source
>>
>> as
>>
>> select order_id, order_goods_id, user_id,...
>>
>> from (
>>
>> ..  proctime,row_number() over(partition by order_id,
>> order_goods_id order by proctime desc) as rownum
>>
>> from hive.temp_dw.dm_trd_order_goods/*+ 
>> OPTIONS('properties.group.id'='flink_etl_kafka_hbase',
>> 'scan.startup.mode'='latest-offset') */
>>
>> ) where  rownum = 1 and  price > 0;
>>
>>
>>
>>
>> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as
>> BIGINT),)
>>
>> from
>>
>> (
>>
>> select order_date as rowkey,
>>
>> sum(amount) as saleN,
>>
>> from order_source
>>
>> group by order_date
>>
>> );
>>
>>
>>
>>
>> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as
>> BIGINT))
>>
>> from
>>
>> (
>>
>> select order_hour as rowkey,sum(amount) as saleN,
>>
>>
>>
>> from order_source
>>
>> group by order_hour
>>
>> );
>> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。
>> 最后生成的job是 : a.  order_source  -> sink  1  b.  order_source  -> sink
>> 2
>>
>>
>> 本意是想通过view  order_source
>> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ?
>>
>>


Re: FLINK SQL view的数据复用问题

2020-08-04 文章 godfrey he
blink planner支持将多sink的query优化成尽量复用重复计算部分。
1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务

kandy.wang  于2020年8月4日周二 下午5:20写道:

> FLINK SQL view相关问题:
> create view order_source
>
> as
>
> select order_id, order_goods_id, user_id,...
>
> from (
>
> ..  proctime,row_number() over(partition by order_id,
> order_goods_id order by proctime desc) as rownum
>
> from hive.temp_dw.dm_trd_order_goods/*+ 
> OPTIONS('properties.group.id'='flink_etl_kafka_hbase',
> 'scan.startup.mode'='latest-offset') */
>
> ) where  rownum = 1 and  price > 0;
>
>
>
>
> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as
> BIGINT),)
>
> from
>
> (
>
> select order_date as rowkey,
>
> sum(amount) as saleN,
>
> from order_source
>
> group by order_date
>
> );
>
>
>
>
> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as
> BIGINT))
>
> from
>
> (
>
> select order_hour as rowkey,sum(amount) as saleN,
>
>
>
> from order_source
>
> group by order_hour
>
> );
> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。
> 最后生成的job是 : a.  order_source  -> sink  1  b.  order_source  -> sink
> 2
>
>
> 本意是想通过view  order_source
> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ?
>
>


FLINK SQL view的数据复用问题

2020-08-04 文章 kandy.wang
FLINK SQL view相关问题:
create view order_source

as

select order_id, order_goods_id, user_id,...

from (

..  proctime,row_number() over(partition by order_id, order_goods_id 
order by proctime desc) as rownum

from hive.temp_dw.dm_trd_order_goods/*+ 
OPTIONS('properties.group.id'='flink_etl_kafka_hbase', 
'scan.startup.mode'='latest-offset') */

) where  rownum = 1 and  price > 0; 




insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as 
BIGINT),)

from

(

select order_date as rowkey,

sum(amount) as saleN,

from order_source

group by order_date

);




insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as 
BIGINT))

from

(

select order_hour as rowkey,sum(amount) as saleN,



from order_source

group by order_hour

);
问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。
最后生成的job是 : a.  order_source  -> sink  1  b.  order_source  -> sink  2


本意是想通过view  order_source (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 
,如何做到 ?