Hi Felipe,

Your source is not parallel so it doesn't make sense to make local group
operator parallel.
If the source implemented ParallelSourceFunction, subsequent operators
would be parallelized too.

Regards,
Roman


On Thu, Oct 8, 2020 at 5:00 PM Felipe Gutierrez <
[email protected]> wrote:

> Hi community,
>
> I was implementing the stream aggregation using Table API [1] and
> trying out the local aggregation plan to optimize the query. Basically
> I had to configure it like this:
>
> Configuration configuration = tableEnv.getConfig().getConfiguration();
> // set low-level key-value options
> configuration.setInteger("table.exec.resource.default-parallelism", 4);
> // local-global aggregation depends on mini-batch is enabled
> configuration.setString("table.exec.mini-batch.enabled", "true");
> configuration.setString("table.exec.mini-batch.allow-latency", "1 s");
> configuration.setString("table.exec.mini-batch.size", "1000");
> // enable two-phase, i.e. local-global aggregation
> configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
>
> and when I saw the query plan on the dashboard I realized that the
> LocalGroupAggregate is with parallelism 1 while the
> GlobalGroupAggregate is with parallelism 4. Why was the
> LocalGroupAggregate also with parallelism 4 since I set it on the
> property ("table.exec.resource.default-parallelism"? Here is my code
> [2].
>
> Thanks,
> Felipe
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html
> [2]
> https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/table/TaxiRideCountTable.java
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>

Reply via email to