After implementing this, I'm surprised how long it takes to complete
View.AsMap. GBKaSVForKeys and ToIsmMetadataRecordForKey seem to be the
bottleneck, GCE instance group is using 0.5% CPU and about 8M/s network.

Browsing the implementation, I think this is sorting the keys, which makes
sense, and makes me think there likely isn't room for performance

The job is going to take much longer now, no matter how fast the new
pseudo-join goes. The source of this view is a Bigtable, and the table key
is the same as the join key. I wonder if there's a way to leverage that.


On Tue, Feb 13, 2018 at 9:46 AM, Lukasz Cwik <> wrote:

> Both are doing the same thing effectively by loading the entire iterable
> into memory in the first case and the partitioned iterable into memory in
> the second case.
> The side input performance varies a lot depending on whether your running
> a pipeline with bounded or unbounded PCollections, PCollection sizes, side
> input access pattern.
> On Tue, Feb 13, 2018 at 9:08 AM, Jacob Marble <> wrote:
>> On Mon, Feb 12, 2018 at 3:59 PM, Lukasz Cwik <> wrote:
>>> The optimization that you have done is that you have forced the V1
>>> iterable to reside in memory completely since it is now counted as a single
>>> element. This will fall apart as soon your V1 iterable exceeds memory.
>>> Runners like Dataflow allow re-iteration of a GBK/CoGBK result allowing
>>> for the GBK/CoGBK result to exceed the size of memory and this currently
>>> only functions at the first level within the value iterable, meaning that
>>> the entire Iterable<V1> is treated as a single value in your
>>> Join.someJoin. You should see similar performance if you take all the
>>> V1s out of the CoGBK and "copy" it into an arraylist inside your DoFn and
>>> then walk the V2 iterable and the in memory array list performing the outer
>>> join. It will also likely be easier to reason about. Note that Dataflow
>>> doesn't do this in a great way and causes the re-iteration to happen many
>>> more times then it should need to which is why your perf numbers are
>>> ballooning.
>> Not sure, are you referring to (1) or (2) of my original?
>> So (2) did fail with full production load. The join has 2+ billion
>> elements on both sides.
>>> Alternatively, have you tried putting either of the PCollection<K, V (V1
>>> or V2)> into a multimap side input and then just doing a GBK on the other
>>> PCollection<K, V (V2 or V1)> followed by a DoFn that joins the two together
>>> with the multimap side input?
>>> The choice of whether V1 or V2 works better in the side input depends on
>>> the sizes of the relative PCollections and whether the working set of the
>>> PCollection can be cached in memory (good for side input) or the GBK
>>> PCollection is sparse enough that if everything is cache miss it won't
>>> matter.
>> I'll try this. K:V2 has unique keys and doesn't change a lot from
>> day-to-day, so I'll make that a side input. Should I expect this method to
>> perform significantly slower than Join.someJoin/CoGBK?
>> On Mon, Feb 12, 2018 at 1:53 PM, Jacob Marble <>
>>> wrote:
>>>> When joining (Join.leftOuterJoin etc) a PCollection<K, V1> to
>>>> PCollection<K, V2>, and K:V1 contains hot keys, my pipeline gets very slow.
>>>> It can bring processing time from hours to days.
>>>> Reading this blog post
>>>> <>
>>>>  I
>>>> can see some thought has already been given to this problem:
>>>> "To address this, we allow you to provide extra parallelism hints using
>>>> the Combine.PerKey.withHotKeyFanout or Combine.Globally.withFanout.
>>>> These operations will create an extra step in your pipeline to
>>>> pre-aggregate the data on many machines before performing the final
>>>> aggregation on the target machines."
>>>> (1 of 2)
>>>> These two solutions, Combine.PerKey.withHotKeyFanout or
>>>> Combine.Globally.withFanout, do not help with a join (CoGBK) operation,
>>>> however. So, I solved my problem with these stages before and after the
>>>> join operation, effectively joining K:Iterable<V1> with K:V2:
>>>> kvIterable1 = kv1.apply("GBK to mitigate hot keys", GroupByKey.<K,
>>>> V1>create())
>>>> Join.someJoin(kvIterable1, kv2)
>>>>         .apply(Values.create())
>>>>         .apply("undo hot key GBK",
>>>>             ParDo
>>>>                 .of(new DoFn<KV<Iterable<V1>, V2>, KV<V1, V2>>() {
>>>>                   @ProcessElement
>>>>                   public void fanout(ProcessContext context) {
>>>>                     for (V1 v1 : context.element().getKey()) {
>>>>                       context.output(KV.of(v1,
>>>> context.element().getValue()));
>>>>                     }
>>>>                   }
>>>>                 }))
>>>> Does that look sane to people who have been working with Beam for a
>>>> long time? It has worked well for us over the last two months or so.
>>>> (2 of 2)
>>>> Lately, the size of the value has grown too large. It took some effort
>>>> to figure out the problem, which manifested as an
>>>> ArrayIndexOutOfBoundsException emitted from RandomAccessData.write().
>>>> Here's the follow-up solution, only changing the first half of the above
>>>> solution:
>>>> kvIterable1 = kv1
>>>>         .apply("GBK to mitigate hot keys", GroupByKey.<K, V1>create())
>>>>         .apply("partition grouped values",
>>>>             ParDo
>>>>                 .of(new DoFn<KV<K, Iterable<V1>>, KV<K,
>>>> Iterable<V1>>>() {
>>>>                   @ProcessElement
>>>>                   public void partition(ProcessContext context) {
>>>>                     K k = context.element().getKey();
>>>>                     Iterable<V1> v1Iterable =
>>>> context.element().getValue();
>>>>                     for (List<V1> partition :
>>>>                         Iterables.partition(v1Iterable, 1000000)) {
>>>>                       context.output(KV.<K, Iterable<V1>>of(k,
>>>> partition));
>>>>                     }
>>>>                   }
>>>>                 }));
>>>> Again, is this sane? Initial testing suggests this is a good solution.
>>>> Jacob

Reply via email to