On Mon, Feb 12, 2018 at 3:59 PM, Lukasz Cwik <lc...@google.com> wrote:
> The optimization that you have done is that you have forced the V1 > iterable to reside in memory completely since it is now counted as a single > element. This will fall apart as soon your V1 iterable exceeds memory. > Runners like Dataflow allow re-iteration of a GBK/CoGBK result allowing > for the GBK/CoGBK result to exceed the size of memory and this currently > only functions at the first level within the value iterable, meaning that > the entire Iterable<V1> is treated as a single value in your > Join.someJoin. You should see similar performance if you take all the V1s > out of the CoGBK and "copy" it into an arraylist inside your DoFn and then > walk the V2 iterable and the in memory array list performing the outer > join. It will also likely be easier to reason about. Note that Dataflow > doesn't do this in a great way and causes the re-iteration to happen many > more times then it should need to which is why your perf numbers are > ballooning. > Not sure, are you referring to (1) or (2) of my original? So (2) did fail with full production load. The join has 2+ billion elements on both sides. > Alternatively, have you tried putting either of the PCollection<K, V (V1 > or V2)> into a multimap side input and then just doing a GBK on the other > PCollection<K, V (V2 or V1)> followed by a DoFn that joins the two together > with the multimap side input? > The choice of whether V1 or V2 works better in the side input depends on > the sizes of the relative PCollections and whether the working set of the > PCollection can be cached in memory (good for side input) or the GBK > PCollection is sparse enough that if everything is cache miss it won't > matter. > I'll try this. K:V2 has unique keys and doesn't change a lot from day-to-day, so I'll make that a side input. Should I expect this method to perform significantly slower than Join.someJoin/CoGBK? On Mon, Feb 12, 2018 at 1:53 PM, Jacob Marble <jmar...@kochava.com> wrote: > >> When joining (Join.leftOuterJoin etc) a PCollection<K, V1> to >> PCollection<K, V2>, and K:V1 contains hot keys, my pipeline gets very slow. >> It can bring processing time from hours to days. >> >> Reading this blog post >> <https://cloud.google.com/blog/big-data/2016/02/writing-dataflow-pipelines-with-scalability-in-mind> >> I >> can see some thought has already been given to this problem: >> "To address this, we allow you to provide extra parallelism hints using >> the Combine.PerKey.withHotKeyFanout or Combine.Globally.withFanout. >> These operations will create an extra step in your pipeline to >> pre-aggregate the data on many machines before performing the final >> aggregation on the target machines." >> >> (1 of 2) >> >> These two solutions, Combine.PerKey.withHotKeyFanout or >> Combine.Globally.withFanout, do not help with a join (CoGBK) operation, >> however. So, I solved my problem with these stages before and after the >> join operation, effectively joining K:Iterable<V1> with K:V2: >> >> kvIterable1 = kv1.apply("GBK to mitigate hot keys", GroupByKey.<K, >> V1>create()) >> >> Join.someJoin(kvIterable1, kv2) >> .apply(Values.create()) >> .apply("undo hot key GBK", >> ParDo >> .of(new DoFn<KV<Iterable<V1>, V2>, KV<V1, V2>>() { >> @ProcessElement >> public void fanout(ProcessContext context) { >> for (V1 v1 : context.element().getKey()) { >> context.output(KV.of(v1, >> context.element().getValue())); >> } >> } >> })) >> >> Does that look sane to people who have been working with Beam for a long >> time? It has worked well for us over the last two months or so. >> >> (2 of 2) >> >> Lately, the size of the value has grown too large. It took some effort to >> figure out the problem, which manifested as an >> ArrayIndexOutOfBoundsException emitted from RandomAccessData.write(). >> Here's the follow-up solution, only changing the first half of the above >> solution: >> >> kvIterable1 = kv1 >> .apply("GBK to mitigate hot keys", GroupByKey.<K, V1>create()) >> .apply("partition grouped values", >> ParDo >> .of(new DoFn<KV<K, Iterable<V1>>, KV<K, Iterable<V1>>>() { >> @ProcessElement >> public void partition(ProcessContext context) { >> K k = context.element().getKey(); >> Iterable<V1> v1Iterable = >> context.element().getValue(); >> for (List<V1> partition : >> Iterables.partition(v1Iterable, 1000000)) { >> context.output(KV.<K, Iterable<V1>>of(k, >> partition)); >> } >> } >> })); >> >> Again, is this sane? Initial testing suggests this is a good solution. >> >> Jacob >> > >