On Mon, Feb 12, 2018 at 3:59 PM, Lukasz Cwik <lc...@google.com> wrote:

> The optimization that you have done is that you have forced the V1
> iterable to reside in memory completely since it is now counted as a single
> element. This will fall apart as soon your V1 iterable exceeds memory.
> Runners like Dataflow allow re-iteration of a GBK/CoGBK result allowing
> for the GBK/CoGBK result to exceed the size of memory and this currently
> only functions at the first level within the value iterable, meaning that
> the entire Iterable<V1> is treated as a single value in your
> Join.someJoin. You should see similar performance if you take all the V1s
> out of the CoGBK and "copy" it into an arraylist inside your DoFn and then
> walk the V2 iterable and the in memory array list performing the outer
> join. It will also likely be easier to reason about. Note that Dataflow
> doesn't do this in a great way and causes the re-iteration to happen many
> more times then it should need to which is why your perf numbers are
> ballooning.

Not sure, are you referring to (1) or (2) of my original?

So (2) did fail with full production load. The join has 2+ billion elements
on both sides.

> Alternatively, have you tried putting either of the PCollection<K, V (V1
> or V2)> into a multimap side input and then just doing a GBK on the other
> PCollection<K, V (V2 or V1)> followed by a DoFn that joins the two together
> with the multimap side input?
> The choice of whether V1 or V2 works better in the side input depends on
> the sizes of the relative PCollections and whether the working set of the
> PCollection can be cached in memory (good for side input) or the GBK
> PCollection is sparse enough that if everything is cache miss it won't
> matter.

I'll try this. K:V2 has unique keys and doesn't change a lot from
day-to-day, so I'll make that a side input. Should I expect this method to
perform significantly slower than Join.someJoin/CoGBK?

On Mon, Feb 12, 2018 at 1:53 PM, Jacob Marble <jmar...@kochava.com> wrote:
>> When joining (Join.leftOuterJoin etc) a PCollection<K, V1> to
>> PCollection<K, V2>, and K:V1 contains hot keys, my pipeline gets very slow.
>> It can bring processing time from hours to days.
>> Reading this blog post
>> <https://cloud.google.com/blog/big-data/2016/02/writing-dataflow-pipelines-with-scalability-in-mind>
>>  I
>> can see some thought has already been given to this problem:
>> "To address this, we allow you to provide extra parallelism hints using
>> the Combine.PerKey.withHotKeyFanout or Combine.Globally.withFanout.
>> These operations will create an extra step in your pipeline to
>> pre-aggregate the data on many machines before performing the final
>> aggregation on the target machines."
>> (1 of 2)
>> These two solutions, Combine.PerKey.withHotKeyFanout or
>> Combine.Globally.withFanout, do not help with a join (CoGBK) operation,
>> however. So, I solved my problem with these stages before and after the
>> join operation, effectively joining K:Iterable<V1> with K:V2:
>> kvIterable1 = kv1.apply("GBK to mitigate hot keys", GroupByKey.<K,
>> V1>create())
>> Join.someJoin(kvIterable1, kv2)
>>         .apply(Values.create())
>>         .apply("undo hot key GBK",
>>             ParDo
>>                 .of(new DoFn<KV<Iterable<V1>, V2>, KV<V1, V2>>() {
>>                   @ProcessElement
>>                   public void fanout(ProcessContext context) {
>>                     for (V1 v1 : context.element().getKey()) {
>>                       context.output(KV.of(v1,
>> context.element().getValue()));
>>                     }
>>                   }
>>                 }))
>> Does that look sane to people who have been working with Beam for a long
>> time? It has worked well for us over the last two months or so.
>> (2 of 2)
>> Lately, the size of the value has grown too large. It took some effort to
>> figure out the problem, which manifested as an
>> ArrayIndexOutOfBoundsException emitted from RandomAccessData.write().
>> Here's the follow-up solution, only changing the first half of the above
>> solution:
>> kvIterable1 = kv1
>>         .apply("GBK to mitigate hot keys", GroupByKey.<K, V1>create())
>>         .apply("partition grouped values",
>>             ParDo
>>                 .of(new DoFn<KV<K, Iterable<V1>>, KV<K, Iterable<V1>>>() {
>>                   @ProcessElement
>>                   public void partition(ProcessContext context) {
>>                     K k = context.element().getKey();
>>                     Iterable<V1> v1Iterable =
>> context.element().getValue();
>>                     for (List<V1> partition :
>>                         Iterables.partition(v1Iterable, 1000000)) {
>>                       context.output(KV.<K, Iterable<V1>>of(k,
>> partition));
>>                     }
>>                   }
>>                 }));
>> Again, is this sane? Initial testing suggests this is a good solution.
>> Jacob

Reply via email to