Hi all, I am experiencing some performance issues when running joins in Beam using Dataflow. I've looked through the code but I couldn't find the culprit. Perhaps I'm doing something wrong or inefficiently?
Code snippet: PCollection<List<KV<String, String>>> leftBeforeFormat = ... > PCollection<KV<String, List<KV<String, String>>>> left = > leftBeforeFormat.apply(Par.of(new GetJoinKey())) > PCollection<List<KV<String, String>>> rightBeforeFormat = ... > PCollection<KV<String, List<KV<String, String>>>> right = > rightBeforeFormat.apply(Par.of(new GetJoinKey())) To get the String keys in the encompassing KV objects (highlighted), I pick out the relevant KV objects from the Lists and convert to a String. This is the GetJoinKey() DoFn. PCollectionView<List<String>> rightKeys = rightBeforeFormat.apply(new > GenerateKeys()) The GenerateKeys() PTransform takes a single element of the input PCollection (each element contains 1 List), outputs the key from each KV in the List, and returns a PCollectionView. This is used when generating the correct null value for an outer join. I then join using the standard library: Join.leftOuterJoin(left, right, Collections.emptyList()).apply(ParDo.of(new > JoinLists(rightKeys)).withSideInputs(rightKeys)) The leftOuterJoin() returns a PCollection<KV<String, KV<List<KV<String, String>>, List<KV<String, String>>>>> object . The JoinLists() DoFn combines the two Lists which come from the left and right PCollections. When running on Dataflow with 10 workers for a left collection with ~ 770,000 elements and a right collection with 688 elements, the Join step starts off processing around 2-3,000 elements a second but then drops to ~ 100 elements a second. It scales up but this doesn't seem to help. [image: pipeline.jpg] I know when the right collection is much smaller you can use side inputs, so I have done this and this seems to help, but then I have another Join where both collections have 700,000 - 800,000 elements each (so side inputs might not be appropriate) and this also experiences the same slowness. Any ideas for where the slowness might be coming from? Thanks, Joe
