Hi Jark and Benchao

I have learned from your previous email  on how to do pv/uv in flink sql.
One is to make a yyyyMMdd grouping, the other is to make a day window.
Thank you all.

I have a question about the result output. For yyyyMMdd grouping, every
minute the database would get a record, and many records would be in the
database as time goes on, but there would be only a few records in the
database according to the day window.

for example, the pv would be 12:00,100   12:01,200  12:02,300   12:03,400
according to the yyyyMMdd grouping solution, for the day window solution,
there would be only one record as  12:00,100 |12:01,200|12:02,300|12:03,400.

I wonder, for the day window solution, is it possible to have the same
result output as the yyyyMMdd solution? because the day window solution has
no worry about the state retention.

Thanks.

Yours sincerely

Josh

On Sat, Apr 18, 2020 at 9:38 PM Jark Wu <[email protected]> wrote:

> Hi,
>
> I will use English because we are also sending to user@ ML.
>
> This behavior is as expected, not a bug. Benchao gave a good explanation
> about the reason. I will give some further explanation.
> In Flink SQL, we will split an update operation (such as uv from 100 ->
> 101) into two separate messages, one is -[key, 100], the other is +[key,
> 101].
> Once these two messages arrive the downstream aggregation, it will also
> send two result messages (assuming the previous SUM(uv) is 500),
> one is [key, 400], the other is [key, 501].
>
> But this problem is almost addressed since 1.9, if you enabled the
> mini-batch optimization [1]. Because mini-batch optimization will try best
> to the
> accumulate the separate + and - message in a single mini-batch processing.
> You can upgrade and have a try.
>
> Best,
> Jark
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
>
>
>
> On Sat, 18 Apr 2020 at 12:26, Benchao Li <[email protected]> wrote:
>
>> 这个按照目前的设计,应该不能算是bug,应该是by desigh的。
>> 主要问题还是因为有两层agg,第一层的agg的retract会导致第二层的agg重新计算和下发结果。
>>
>> dixingxing85 <[email protected]>于2020年4月18日 周六上午11:38写道:
>>
>>> 多谢benchao,
>>> 我这个作业的结果预期结果是每天只有一个结果,这个结果应该是越来越大的,比如:
>>> 20200417,86
>>> 20200417,90
>>> 20200417,130
>>> 20200417,131
>>>
>>> 而不应该是忽大忽小的,数字由大变小,这样的结果需求方肯定不能接受的:
>>> 20200417,90
>>> 20200417,86
>>> 20200417,130
>>> 20200417,86
>>> 20200417,131
>>>
>>> 我的疑问是内层的group by产生的retract流,会影响sink吗,我是在sink端打的日志。
>>> 如果flink支持这种两层group by的话,那这种结果变小的情况应该算是bug吧?
>>>
>>> Sent from my iPhone
>>>
>>> On Apr 18, 2020, at 10:08, Benchao Li <[email protected]> wrote:
>>>
>>> 
>>>
>>> Hi,
>>>
>>> 这个是支持的哈。
>>> 你看到的现象是因为group by会产生retract结果,也就是会先发送-[old],再发送+[new].
>>> 如果是两层的话,就成了:
>>> 第一层-[old], 第二层-[cur], +[old]
>>> 第一层+[new], 第二层[-old], +[new]
>>>
>>> [email protected] <[email protected]> 于2020年4月18日周六 上午2:11写道:
>>>
>>>>
>>>> Hi all:
>>>>
>>>> 我们有个streaming sql得到的结果不正确,现象是sink得到的数据一会大一会小,*我们想确认下,这是否是个bug,
>>>> 或者flink还不支持这种sql*。
>>>> 具体场景是:先group by A, B两个维度计算UV,然后再group by A 把维度B的UV sum起来,对应的SQL如下:(A
>>>> -> dt,  B -> pvareaid)
>>>>
>>>> SELECT dt, SUM(a.uv) AS uv
>>>> FROM (
>>>>    SELECT dt, pvareaid, COUNT(DISTINCT cuid) AS uv
>>>>    FROM streaming_log_event
>>>>    WHERE action IN ('action1')
>>>>       AND pvareaid NOT IN ('pv1', 'pv2')
>>>>       AND pvareaid IS NOT NULL
>>>>    GROUP BY dt, pvareaid
>>>> ) a
>>>> GROUP BY dt;
>>>>
>>>> sink接收到的数据对应日志为:
>>>>
>>>> 2020-04-17 22:28:38,727    INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>>>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>>>> data(false,0,86,20200417)
>>>> 2020-04-17 22:28:38,727    INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>>>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>>>> data(true,0,130,20200417)
>>>> 2020-04-17 22:28:39,327    INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>>>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>>>> data(false,0,130,20200417)
>>>> 2020-04-17 22:28:39,327    INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>>>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>>>> data(true,0,86,20200417)
>>>> 2020-04-17 22:28:39,327    INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>>>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>>>> data(false,0,86,20200417)
>>>> 2020-04-17 22:28:39,328    INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>>>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>>>> data(true,0,131,20200417)
>>>>
>>>>
>>>> 我们使用的是1.7.2, 测试作业的并行度为1。
>>>> 这是对应的 issue: https://issues.apache.org/jira/browse/FLINK-17228
>>>>
>>>>
>>>> ------------------------------
>>>> [email protected]
>>>>
>>>>
>>>
>>> --
>>>
>>> Benchao Li
>>> School of Electronics Engineering and Computer Science, Peking University
>>> Tel:+86-15650713730
>>> Email: [email protected]; [email protected]
>>>
>>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: [email protected]; [email protected]
>>
>>

回复