The optimization that you have done is that you have forced the V1 iterable
to reside in memory completely since it is now counted as a single element.
This will fall apart as soon your V1 iterable exceeds memory.
Runners like Dataflow allow re-iteration of a GBK/CoGBK result allowing for
the GBK/CoGBK result to exceed the size of memory and this currently only
functions at the first level within the value iterable, meaning that the
entire Iterable<V1> is treated as a single value in your Join.someJoin. You
should see similar performance if you take all the V1s out of the CoGBK and
"copy" it into an arraylist inside your DoFn and then walk the V2 iterable
and the in memory array list performing the outer join. It will also likely
be easier to reason about. Note that Dataflow doesn't do this in a great
way and causes the re-iteration to happen many more times then it should
need to which is why your perf numbers are ballooning.

Alternatively, have you tried putting either of the PCollection<K, V (V1 or
V2)> into a multimap side input and then just doing a GBK on the other
PCollection<K, V (V2 or V1)> followed by a DoFn that joins the two together
with the multimap side input?
The choice of whether V1 or V2 works better in the side input depends on
the sizes of the relative PCollections and whether the working set of the
PCollection can be cached in memory (good for side input) or the GBK
PCollection is sparse enough that if everything is cache miss it won't
matter.


On Mon, Feb 12, 2018 at 1:53 PM, Jacob Marble <jmar...@kochava.com> wrote:

> When joining (Join.leftOuterJoin etc) a PCollection<K, V1> to
> PCollection<K, V2>, and K:V1 contains hot keys, my pipeline gets very slow.
> It can bring processing time from hours to days.
>
> Reading this blog post
> <https://cloud.google.com/blog/big-data/2016/02/writing-dataflow-pipelines-with-scalability-in-mind>
>  I
> can see some thought has already been given to this problem:
> "To address this, we allow you to provide extra parallelism hints using
> the Combine.PerKey.withHotKeyFanout or Combine.Globally.withFanout. These
> operations will create an extra step in your pipeline to pre-aggregate the
> data on many machines before performing the final aggregation on the target
> machines."
>
> (1 of 2)
>
> These two solutions, Combine.PerKey.withHotKeyFanout or
> Combine.Globally.withFanout, do not help with a join (CoGBK) operation,
> however. So, I solved my problem with these stages before and after the
> join operation, effectively joining K:Iterable<V1> with K:V2:
>
> kvIterable1 = kv1.apply("GBK to mitigate hot keys", GroupByKey.<K,
> V1>create())
>
> Join.someJoin(kvIterable1, kv2)
>         .apply(Values.create())
>         .apply("undo hot key GBK",
>             ParDo
>                 .of(new DoFn<KV<Iterable<V1>, V2>, KV<V1, V2>>() {
>                   @ProcessElement
>                   public void fanout(ProcessContext context) {
>                     for (V1 v1 : context.element().getKey()) {
>                       context.output(KV.of(v1,
> context.element().getValue()));
>                     }
>                   }
>                 }))
>
> Does that look sane to people who have been working with Beam for a long
> time? It has worked well for us over the last two months or so.
>
> (2 of 2)
>
> Lately, the size of the value has grown too large. It took some effort to
> figure out the problem, which manifested as an
> ArrayIndexOutOfBoundsException emitted from RandomAccessData.write().
> Here's the follow-up solution, only changing the first half of the above
> solution:
>
> kvIterable1 = kv1
>         .apply("GBK to mitigate hot keys", GroupByKey.<K, V1>create())
>         .apply("partition grouped values",
>             ParDo
>                 .of(new DoFn<KV<K, Iterable<V1>>, KV<K, Iterable<V1>>>() {
>                   @ProcessElement
>                   public void partition(ProcessContext context) {
>                     K k = context.element().getKey();
>                     Iterable<V1> v1Iterable = context.element().getValue();
>                     for (List<V1> partition :
>                         Iterables.partition(v1Iterable, 1000000)) {
>                       context.output(KV.<K, Iterable<V1>>of(k, partition));
>                     }
>                   }
>                 }));
>
> Again, is this sane? Initial testing suggests this is a good solution.
>
> Jacob
>

Reply via email to