Hi Jark,

thanks for your reply. Indeed, I forgot to write DISTINCT on the query
and now the query plan is splitting into two hash partition phases.

what do you mean by deterministic time? Why only the window aggregate
is deterministic? If I implement the ProcessingTimeCallback [1]
interface is it deterministic?

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.html
Thanks

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Tue, Nov 10, 2020 at 7:55 AM Jark Wu <imj...@gmail.com> wrote:
>
> Hi Felipe,
>
> The "Split Distinct Aggregation", i.e. the 
> "table.optimizer.distinct-agg.split.enabled" option,
>  only works for distinct aggregations (e.g. COUNT(DISTINCT ...)).
>
> However, the query in your example is using COUNT(driverId).
> You can update it to COUNT(DISTINCT driverId), and it should show two hash 
> phases.
>
> Regarding "MiniBatch Aggregation", it is not the same as a processing-time 
> window aggregation.
>
> 1) MiniBatch is just an optimization on unbounded aggregation, it buffers 
> some input records in memory
>  and processes them together to reduce the state accessing. But 
> processing-time window is still a per-record
>  state accessing style. Besides, the local aggregation also applies 
> mini-batch, it only sends the accumulator
>  of current this mini-batch to the downstream global aggregation, and this 
> improves performance a lot.
> 2) The size of MiniBach is not deterministic. It may be triggered by the 
> number of records or a timeout.
>   But a window aggregate is triggered by a deterministic time.
>
>
> Best,
> Jark
>
>
> On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez <felipe.o.gutier...@gmail.com> 
> wrote:
>>
>> I realized that I forgot the image. Now it is attached.
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>> -- https://felipeogutierrez.blogspot.com
>>
>> On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez
>> <felipe.o.gutier...@gmail.com> wrote:
>> >
>> > Hi community,
>> >
>> > I am testing the "Split Distinct Aggregation" [1] consuming the taxi
>> > ride data set. My sql query from the table environment is the one
>> > below:
>> >
>> > Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate,
>> > COUNT(driverId) FROM TaxiRide GROUP BY startDate");
>> >
>> > and I am enableing:
>> > configuration.setString("table.exec.mini-batch.enabled", "true");
>> > configuration.setString("table.exec.mini-batch.allow-latency", "3 s");
>> > configuration.setString("table.exec.mini-batch.size", "5000");
>> > configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
>> > and finally
>> > configuration.setString("table.optimizer.distinct-agg.split.enabled", 
>> > "true");
>> >
>> > I was expecting that the query plan at the WEB UI show to me two hash
>> > phases as it is present here on the image [1]. Instead, it is showing
>> > to me the same plan with one hash phase as I was deploying only one
>> > Local aggregate and one Global aggregate (of course, taking the
>> > parallel instances into consideration). Please see the query execution
>> > plan image attached.
>> >
>> > Is there something that I am missing when I config the Table API?
>> > By the way, I am a bit confused with the "MiniBatch Aggregation" [2].
>> > Is the "MiniBatch Aggregation" aggregating as a processing time window
>> > on the operator after the hash phase? If it is, isn't it the same as a
>> > window aggregation instead of an unbounded window as the example
>> > presents?
>> >
>> > Thanks!
>> > Felipe
>> >
>> > [1] 
>> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
>> > [2] 
>> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
>> > --
>> > -- Felipe Gutierrez
>> > -- skype: felipe.o.gutierrez
>> > -- https://felipeogutierrez.blogspot.com

Reply via email to