Hello - I have a large number of pairs of files. For purpose of discussion: /source1/{1..10000} and /source2/{1..10000}.
I want to join the files pair-wise: /source1/1 joined to /source2/1, /source1/2 joined to /source2/2, and so on. I then want to union the results of the pair-wise joins and perform an aggregate. I create a simple flink job that has four sources, two joins, and two sinks to produce intermediate results. This represents two unrelated chains. I notice that when running this job with parallelism = 1 on a standalone machine with one task manager and 3 slots, only one slot gets used. My concern is that when I scale up to a YARN cluster, flink will continue to use one slot on one machine instead of using all slots on all machines. Prior reading suggests all the data source subtasks are added to a default resource group. Downstream tasks (joins and sinks) want to be colocated with the data sources. The result is all of my tasks are executed in one slot. Flink Stream (DataStream) offers the slotSharingGroup() function. This doesn't seem available to the DataSet user. *Q1:* How do I force Flink to distribute work evenly across task managers and the slots allocated to them? If this shouldn't be a concern, please elaborate. When I scale up the number of unrelated chains I notice that flink seems to start all of them at the same time, which results in thrashing and errors - lots of IO and errors regarding hash buffers. *Q2:* Is there any method for controlling the scheduling of tasks so that some finish before others start? My work around is to execute multiple, sequential batches with results going into an intermediate directory, and then a final job that aggregates the results. I would certainly prefer one job that might avoid the intermediate write. If I treat /source1 as one data source and /source2 as the second, and then join the two, flink will shuffle and partition the files on the join key. The /source1 and /source2 files represent this partitioning. They are reused multiple times; thus, I shuffle and save the results creating /source1 and /source2. *Q3:* Does flink have a method by which I can mark individual files (or directories) as belonging to a particular partition so that when I try to join them, the unnecessary shuffle and repartition is avoided? Thank you, David -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/