Hi Felipe, The "Split Distinct Aggregation", i.e. the "table.optimizer.distinct-agg.split.enabled" option, only works for distinct aggregations (e.g. COUNT(DISTINCT ...)).
However, the query in your example is using COUNT(driverId). You can update it to COUNT(DISTINCT driverId), and it should show two hash phases. Regarding "MiniBatch Aggregation", it is not the same as a processing-time window aggregation. 1) MiniBatch is just an optimization on unbounded aggregation, it buffers some input records in memory and processes them together to reduce the state accessing. But processing-time window is still a per-record state accessing style. Besides, the local aggregation also applies mini-batch, it only sends the accumulator of current this mini-batch to the downstream global aggregation, and this improves performance a lot. 2) The size of MiniBach is not deterministic. It may be triggered by the number of records or a timeout. But a window aggregate is triggered by a deterministic time. Best, Jark On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez <felipe.o.gutier...@gmail.com> wrote: > I realized that I forgot the image. Now it is attached. > -- > -- Felipe Gutierrez > -- skype: felipe.o.gutierrez > -- https://felipeogutierrez.blogspot.com > > On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez > <felipe.o.gutier...@gmail.com> wrote: > > > > Hi community, > > > > I am testing the "Split Distinct Aggregation" [1] consuming the taxi > > ride data set. My sql query from the table environment is the one > > below: > > > > Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate, > > COUNT(driverId) FROM TaxiRide GROUP BY startDate"); > > > > and I am enableing: > > configuration.setString("table.exec.mini-batch.enabled", "true"); > > configuration.setString("table.exec.mini-batch.allow-latency", "3 s"); > > configuration.setString("table.exec.mini-batch.size", "5000"); > > configuration.setString("table.optimizer.agg-phase-strategy", > "TWO_PHASE"); > > and finally > > configuration.setString("table.optimizer.distinct-agg.split.enabled", > "true"); > > > > I was expecting that the query plan at the WEB UI show to me two hash > > phases as it is present here on the image [1]. Instead, it is showing > > to me the same plan with one hash phase as I was deploying only one > > Local aggregate and one Global aggregate (of course, taking the > > parallel instances into consideration). Please see the query execution > > plan image attached. > > > > Is there something that I am missing when I config the Table API? > > By the way, I am a bit confused with the "MiniBatch Aggregation" [2]. > > Is the "MiniBatch Aggregation" aggregating as a processing time window > > on the operator after the hash phase? If it is, isn't it the same as a > > window aggregation instead of an unbounded window as the example > > presents? > > > > Thanks! > > Felipe > > > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation > > [2] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation > > -- > > -- Felipe Gutierrez > > -- skype: felipe.o.gutierrez > > -- https://felipeogutierrez.blogspot.com >