Jeff - does setting the global default work for you, or do you need per-operator control? Seems like it would be to add this to ResourceHints.
On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw <rober...@google.com> wrote: > Yeah, I don't think we have a good per-operator API for this. If we were > to add it, it probably belongs in ResourceHints. > > On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax <re...@google.com> wrote: > >> Looking at FlinkPipelineOptions, there is a parallelism option you can >> set. I believe this sets the default parallelism for all Flink operators. >> >> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang <zjf...@gmail.com> wrote: >> >>> Thanks Holden, this would work for Spark, but Flink doesn't have such >>> kind of mechanism, so I am looking for a general solution on the beam side. >>> >>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau <hol...@pigscanfly.ca> >>> wrote: >>> >>>> To a (small) degree Sparks “new” AQE might be able to help depending on >>>> what kind of operations Beam is compiling it down to. >>>> >>>> Have you tried setting spark.sql.adaptive.enabled & >>>> spark.sql.adaptive.coalescePartitions.enabled >>>> >>>> >>>> >>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user < >>>> user@beam.apache.org> wrote: >>>> >>>>> I see. Robert - what is the story for parallelism controls on GBK with >>>>> the Spark or Flink runners? >>>>> >>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <zjf...@gmail.com> wrote: >>>>> >>>>>> No, I don't use dataflow, I use Spark & Flink. >>>>>> >>>>>> >>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com> wrote: >>>>>> >>>>>>> Are you running on the Dataflow runner? If so, Dataflow - unlike >>>>>>> Spark and Flink - dynamically modifies the parallelism as the operator >>>>>>> runs, so there is no need to have such controls. In fact these specific >>>>>>> controls wouldn't make much sense for the way Dataflow implements these >>>>>>> operators. >>>>>>> >>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zjf...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Just for performance tuning like in Spark and Flink. >>>>>>>> >>>>>>>> >>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user < >>>>>>>> user@beam.apache.org> wrote: >>>>>>>> >>>>>>>>> What are you trying to achieve by setting the parallelism? >>>>>>>>> >>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zjf...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in operator >>>>>>>>>> level. And the input size of the operator is unknown at compiling >>>>>>>>>> stage if >>>>>>>>>> it is not a source >>>>>>>>>> operator, >>>>>>>>>> >>>>>>>>>> Here's an example of flink >>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level >>>>>>>>>> Spark also support to set operator level parallelism (see groupByKey >>>>>>>>>> and reduceByKey): >>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user < >>>>>>>>>> user@beam.apache.org> wrote: >>>>>>>>>> >>>>>>>>>>> The maximum parallelism is always determined by the parallelism >>>>>>>>>>> of your data. If you do a GroupByKey for example, the number of >>>>>>>>>>> keys in >>>>>>>>>>> your data determines the maximum parallelism. >>>>>>>>>>> >>>>>>>>>>> Beyond the limitations in your data, it depends on your >>>>>>>>>>> execution engine. If you're using Dataflow, Dataflow is designed to >>>>>>>>>>> automatically determine the parallelism (e.g. work will be >>>>>>>>>>> dynamically >>>>>>>>>>> split and moved around between workers, the number of workers will >>>>>>>>>>> autoscale, etc.), so there's no need to explicitly set the >>>>>>>>>>> parallelism of >>>>>>>>>>> the execution. >>>>>>>>>>> >>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <zjf...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Besides the global parallelism of beam job, is there any way to >>>>>>>>>>>> set parallelism for individual operators like group by and join? I >>>>>>>>>>>> understand the parallelism setting depends on the underlying >>>>>>>>>>>> execution >>>>>>>>>>>> engine, but it is very common to set parallelism like group by and >>>>>>>>>>>> join in >>>>>>>>>>>> both spark & flink. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Best Regards >>>>>>>>>>>> >>>>>>>>>>>> Jeff Zhang >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Best Regards >>>>>>>>>> >>>>>>>>>> Jeff Zhang >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Best Regards >>>>>>>> >>>>>>>> Jeff Zhang >>>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Best Regards >>>>>> >>>>>> Jeff Zhang >>>>>> >>>>> -- >>>> Twitter: https://twitter.com/holdenkarau >>>> Books (Learning Spark, High Performance Spark, etc.): >>>> https://amzn.to/2MaRAG9 <https://amzn.to/2MaRAG9> >>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau >>>> >>> >>> >>> -- >>> Best Regards >>> >>> Jeff Zhang >>> >>