Obviously when trying to simplify my code I didn't substitute correctly the variable of the join..it should be:
DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset = attrToExpand.join(*subset* ).where(0).equalTo(0).projectFirst(0,1).projectSecond(1); Do you think that a JoinHint to create a sort-merge join is equivalent to my solution? On Tue, Sep 8, 2015 at 10:45 AM, Stephan Ewen <se...@apache.org> wrote: > Hi Flavio! > > No, Flink does not join keys before full values. That is very often very > inefficient, as it results effectively in two joins where one is typically > about as expensive as the original join. > > One can do "semi-join-reduction", in case the join filters out many values > (many elements from one side do not find a match in the other side). If the > join does not filter, this does not help either. > > Your code is a bit of a surprise. Especially, because in you solution that > worked, the first statement does nothing: > > DataSet<Tuple2<String, List<ThriftObj>>> subset = > > attrToExpand.project(0).joinWithHuge(bigDataset).where(0).equalTo(0).getInput2(); > > > This builds a join, but then takes the second input of the join (the > bigDataset data set). Because the result of the join is never > actually used, it is never executed. The second statement hence is > effectively > > DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset = > > attrToExpand.join(bigDataset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1); > > > Curious why this executed when the original did not. > > BTW: If the Lists are very long so they do not fit into a hashtable memory > partition, you can try to use a JoinHint to create a sort-merge join. It > may become slower, but typically works with even less memory. > > > Greetings, > Stephan > > > On Tue, Sep 8, 2015 at 9:59 AM, Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> Hi to all, >> >> I have a case where I don't understand why flink is not able to optimize >> the join between 2 datasets. >> >> My initial code was basically this: >> >> DataSet<Tuple2<String, List<ThriftObj>>> bigDataset = ...;//5.257.207 >> elements >> DataSet<Tuple2<String,List<MyObject>>> attrToExpand = >> ...;//65.000 elements >> >> DataSet<Tuple2<String, IndexAttributeToExpand>> tmp = >> >> attrToExpand.joinWithHuge(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1); >> >> This job wasn't able to complete on my local machine (from Eclipse) >> because Flink was giving me the following error: >> >> Hash join exceeded maximum number of recursions, without reducing >> partitions enough to be memory resident. Probably cause: Too many duplicate >> keys. >> >> This was because in attrToExpand the List<MyObject> could be quite big. >> Indeed, changing that code to the following make everything work like a >> charm: >> >> DataSet<Tuple2<String, List<ThriftObj>>> subset = >> >> attrToExpand.project(0).joinWithHuge(bigDataset).where(0).equalTo(0).getInput2(); >> >> DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset = >> >> attrToExpand.join(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1); >> >> >> Isn't something impossible for Flink to optimize my initial code into the >> second? I was convinced that Flink was performing a join only on the keys >> before grabbing also the other elements of the Tuples into memory..am I >> wrong? >> >> Best, >> Flavio >> > >