Hi all,

I am experiencing some performance issues when running joins in Beam using
Dataflow. I've looked through the code but I couldn't find the culprit.
Perhaps I'm doing something wrong or inefficiently?

Code snippet:

PCollection<List<KV<String, String>>> leftBeforeFormat = ...
> PCollection<KV<String, List<KV<String, String>>>> left =
> leftBeforeFormat.apply(Par.of(new GetJoinKey()))
> PCollection<List<KV<String, String>>> rightBeforeFormat = ...
> PCollection<KV<String, List<KV<String, String>>>> right =
> rightBeforeFormat.apply(Par.of(new GetJoinKey()))


To get the String keys in the encompassing KV objects (highlighted), I pick
out the relevant KV objects from the Lists and convert to a String. This is the
GetJoinKey() DoFn.

PCollectionView<List<String>> rightKeys = rightBeforeFormat.apply(new
> GenerateKeys())


The GenerateKeys() PTransform takes a single element of the input
PCollection (each element contains 1 List), outputs the key from each KV in
the List, and returns a PCollectionView. This is used when generating the
correct null value for an outer join.

I then join using the standard library:

Join.leftOuterJoin(left, right, Collections.emptyList()).apply(ParDo.of(new
> JoinLists(rightKeys)).withSideInputs(rightKeys))


The leftOuterJoin() returns a PCollection<KV<String, KV<List<KV<String,
String>>, List<KV<String, String>>>>> object . The JoinLists() DoFn
combines the two Lists which come from the left and right PCollections.

When running on Dataflow with 10 workers for a left collection with ~
770,000 elements and a right collection with 688 elements, the Join step
starts off processing around 2-3,000 elements a second but then drops to ~
100 elements a second. It scales up but this doesn't seem to help.

[image: pipeline.jpg]

I know when the right collection is much smaller you can use side inputs,
so I have done this and this seems to help, but then I have another Join
where both collections have 700,000 - 800,000 elements each (so side inputs
might not be appropriate) and this also experiences the same slowness. Any
ideas for where the slowness might be coming from?

Thanks,
Joe

Reply via email to