When joining (Join.leftOuterJoin etc) a PCollection<K, V1> to
PCollection<K, V2>, and K:V1 contains hot keys, my pipeline gets very slow.
It can bring processing time from hours to days.

Reading this blog post
can see some thought has already been given to this problem:
"To address this, we allow you to provide extra parallelism hints using the
Combine.PerKey.withHotKeyFanout or Combine.Globally.withFanout. These
operations will create an extra step in your pipeline to pre-aggregate the
data on many machines before performing the final aggregation on the target

(1 of 2)

These two solutions, Combine.PerKey.withHotKeyFanout or
Combine.Globally.withFanout, do not help with a join (CoGBK) operation,
however. So, I solved my problem with these stages before and after the
join operation, effectively joining K:Iterable<V1> with K:V2:

kvIterable1 = kv1.apply("GBK to mitigate hot keys", GroupByKey.<K,

Join.someJoin(kvIterable1, kv2)
        .apply("undo hot key GBK",
                .of(new DoFn<KV<Iterable<V1>, V2>, KV<V1, V2>>() {
                  public void fanout(ProcessContext context) {
                    for (V1 v1 : context.element().getKey()) {

Does that look sane to people who have been working with Beam for a long
time? It has worked well for us over the last two months or so.

(2 of 2)

Lately, the size of the value has grown too large. It took some effort to
figure out the problem, which manifested as an
ArrayIndexOutOfBoundsException emitted from RandomAccessData.write().
Here's the follow-up solution, only changing the first half of the above

kvIterable1 = kv1
        .apply("GBK to mitigate hot keys", GroupByKey.<K, V1>create())
        .apply("partition grouped values",
                .of(new DoFn<KV<K, Iterable<V1>>, KV<K, Iterable<V1>>>() {
                  public void partition(ProcessContext context) {
                    K k = context.element().getKey();
                    Iterable<V1> v1Iterable = context.element().getValue();
                    for (List<V1> partition :
                        Iterables.partition(v1Iterable, 1000000)) {
                      context.output(KV.<K, Iterable<V1>>of(k, partition));

Again, is this sane? Initial testing suggests this is a good solution.


Reply via email to