Same need here, using Flink runner. We are processing a pcollection (extracting features per element) then combining these into groups of features and running the next operator on those groups.
Each group contains ~50 elements, so the parallelism of the operator upstream of the groupby should be higher, to be balanced with the downstream operator. On Tue, Apr 18, 2023 at 19:17 Jeff Zhang <[email protected]> wrote: > Hi Reuven, > > It would be better to set parallelism for operators, as I mentioned > before, there may be multiple groupby, join operators in one pipeline, and > their parallelism can be different due to different input data sizes. > > On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax <[email protected]> wrote: > >> Jeff - does setting the global default work for you, or do you need >> per-operator control? Seems like it would be to add this to ResourceHints. >> >> On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw <[email protected]> >> wrote: >> >>> Yeah, I don't think we have a good per-operator API for this. If we were >>> to add it, it probably belongs in ResourceHints. >>> >>> On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax <[email protected]> wrote: >>> >>>> Looking at FlinkPipelineOptions, there is a parallelism option you can >>>> set. I believe this sets the default parallelism for all Flink operators. >>>> >>>> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang <[email protected]> wrote: >>>> >>>>> Thanks Holden, this would work for Spark, but Flink doesn't have such >>>>> kind of mechanism, so I am looking for a general solution on the beam >>>>> side. >>>>> >>>>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau <[email protected]> >>>>> wrote: >>>>> >>>>>> To a (small) degree Sparks “new” AQE might be able to help depending >>>>>> on what kind of operations Beam is compiling it down to. >>>>>> >>>>>> Have you tried setting spark.sql.adaptive.enabled & >>>>>> spark.sql.adaptive.coalescePartitions.enabled >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> I see. Robert - what is the story for parallelism controls on >>>>>>> GBK with the Spark or Flink runners? >>>>>>> >>>>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <[email protected]> wrote: >>>>>>> >>>>>>>> No, I don't use dataflow, I use Spark & Flink. >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Are you running on the Dataflow runner? If so, Dataflow - unlike >>>>>>>>> Spark and Flink - dynamically modifies the parallelism as the operator >>>>>>>>> runs, so there is no need to have such controls. In fact these >>>>>>>>> specific >>>>>>>>> controls wouldn't make much sense for the way Dataflow implements >>>>>>>>> these >>>>>>>>> operators. >>>>>>>>> >>>>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Just for performance tuning like in Spark and Flink. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> What are you trying to achieve by setting the parallelism? >>>>>>>>>>> >>>>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in >>>>>>>>>>>> operator level. And the input size of the operator is unknown at >>>>>>>>>>>> compiling >>>>>>>>>>>> stage if it is not a source >>>>>>>>>>>> operator, >>>>>>>>>>>> >>>>>>>>>>>> Here's an example of flink >>>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level >>>>>>>>>>>> Spark also support to set operator level parallelism (see >>>>>>>>>>>> groupByKey >>>>>>>>>>>> and reduceByKey): >>>>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> The maximum parallelism is always determined by the >>>>>>>>>>>>> parallelism of your data. If you do a GroupByKey for example, the >>>>>>>>>>>>> number of >>>>>>>>>>>>> keys in your data determines the maximum parallelism. >>>>>>>>>>>>> >>>>>>>>>>>>> Beyond the limitations in your data, it depends on your >>>>>>>>>>>>> execution engine. If you're using Dataflow, Dataflow is designed >>>>>>>>>>>>> to >>>>>>>>>>>>> automatically determine the parallelism (e.g. work will be >>>>>>>>>>>>> dynamically >>>>>>>>>>>>> split and moved around between workers, the number of workers will >>>>>>>>>>>>> autoscale, etc.), so there's no need to explicitly set the >>>>>>>>>>>>> parallelism of >>>>>>>>>>>>> the execution. >>>>>>>>>>>>> >>>>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <[email protected]> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Besides the global parallelism of beam job, is there any way >>>>>>>>>>>>>> to set parallelism for individual operators like group by and >>>>>>>>>>>>>> join? I >>>>>>>>>>>>>> understand the parallelism setting depends on the underlying >>>>>>>>>>>>>> execution >>>>>>>>>>>>>> engine, but it is very common to set parallelism like group by >>>>>>>>>>>>>> and join in >>>>>>>>>>>>>> both spark & flink. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> Best Regards >>>>>>>>>>>>>> >>>>>>>>>>>>>> Jeff Zhang >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Best Regards >>>>>>>>>>>> >>>>>>>>>>>> Jeff Zhang >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Best Regards >>>>>>>>>> >>>>>>>>>> Jeff Zhang >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Best Regards >>>>>>>> >>>>>>>> Jeff Zhang >>>>>>>> >>>>>>> -- >>>>>> Twitter: https://twitter.com/holdenkarau >>>>>> Books (Learning Spark, High Performance Spark, etc.): >>>>>> https://amzn.to/2MaRAG9 <https://amzn.to/2MaRAG9> >>>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau >>>>>> >>>>> >>>>> >>>>> -- >>>>> Best Regards >>>>> >>>>> Jeff Zhang >>>>> >>>> > > -- > Best Regards > > Jeff Zhang >
