Ah..Fortunately it seems to do what I need :) It efficiently filters the bigDataset retaining only the needed elements making the join feasible with few memory.. :) So that's a bug? Which should be the right way to achieve that behaviour with Flink?
On Tue, Sep 8, 2015 at 11:22 AM, Stephan Ewen <se...@apache.org> wrote: > The problem is the "getInput2()" call. It takes the input to the join, not > the result of the join. That way, the first join never happens. > > On Tue, Sep 8, 2015 at 11:10 AM, Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> Obviously when trying to simplify my code I didn't substitute correctly >> the variable of the join..it should be: >> >> DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset = >> attrToExpand.join(*subset* >> ).where(0).equalTo(0).projectFirst(0,1).projectSecond(1); >> >> Do you think that a JoinHint to create a sort-merge join is equivalent to >> my solution? >> >> >> On Tue, Sep 8, 2015 at 10:45 AM, Stephan Ewen <se...@apache.org> wrote: >> >>> Hi Flavio! >>> >>> No, Flink does not join keys before full values. That is very often very >>> inefficient, as it results effectively in two joins where one is typically >>> about as expensive as the original join. >>> >>> One can do "semi-join-reduction", in case the join filters out many >>> values (many elements from one side do not find a match in the other side). >>> If the join does not filter, this does not help either. >>> >>> Your code is a bit of a surprise. Especially, because in you solution >>> that worked, the first statement does nothing: >>> >>> DataSet<Tuple2<String, List<ThriftObj>>> subset = >>> >>> attrToExpand.project(0).joinWithHuge(bigDataset).where(0).equalTo(0).getInput2(); >>> >>> >>> This builds a join, but then takes the second input of the join (the >>> bigDataset data set). Because the result of the join is never >>> actually used, it is never executed. The second statement hence is >>> effectively >>> >>> DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset = >>> >>> attrToExpand.join(bigDataset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1); >>> >>> >>> Curious why this executed when the original did not. >>> >>> BTW: If the Lists are very long so they do not fit into a hashtable >>> memory partition, you can try to use a JoinHint to create a sort-merge >>> join. It may become slower, but typically works with even less memory. >>> >>> >>> Greetings, >>> Stephan >>> >>> >>> On Tue, Sep 8, 2015 at 9:59 AM, Flavio Pompermaier <pomperma...@okkam.it >>> > wrote: >>> >>>> Hi to all, >>>> >>>> I have a case where I don't understand why flink is not able to >>>> optimize the join between 2 datasets. >>>> >>>> My initial code was basically this: >>>> >>>> DataSet<Tuple2<String, List<ThriftObj>>> bigDataset = ...;//5.257.207 >>>> elements >>>> DataSet<Tuple2<String,List<MyObject>>> attrToExpand = >>>> ...;//65.000 elements >>>> >>>> DataSet<Tuple2<String, IndexAttributeToExpand>> tmp = >>>> >>>> attrToExpand.joinWithHuge(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1); >>>> >>>> This job wasn't able to complete on my local machine (from Eclipse) >>>> because Flink was giving me the following error: >>>> >>>> Hash join exceeded maximum number of recursions, without reducing >>>> partitions enough to be memory resident. Probably cause: Too many duplicate >>>> keys. >>>> >>>> This was because in attrToExpand the List<MyObject> could be quite big. >>>> Indeed, changing that code to the following make everything work like a >>>> charm: >>>> >>>> DataSet<Tuple2<String, List<ThriftObj>>> subset = >>>> >>>> attrToExpand.project(0).joinWithHuge(bigDataset).where(0).equalTo(0).getInput2(); >>>> >>>> DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset = >>>> >>>> attrToExpand.join(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1); >>>> >>>> >>>> Isn't something impossible for Flink to optimize my initial code into >>>> the second? I was convinced that Flink was performing a join only on the >>>> keys before grabbing also the other elements of the Tuples into memory..am >>>> I wrong? >>>> >>>> Best, >>>> Flavio >>>> >>> >>> >> >> >