The problem is the "getInput2()" call. It takes the input to the join, not
the result of the join. That way, the first join never happens.

On Tue, Sep 8, 2015 at 11:10 AM, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> Obviously when trying to simplify my code I didn't substitute correctly
> the variable of the join..it should be:
>
> DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset =
>       attrToExpand.join(*subset*
> ).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);
>
> Do you think that a JoinHint to create a sort-merge join is equivalent to
> my solution?
>
>
> On Tue, Sep 8, 2015 at 10:45 AM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi Flavio!
>>
>> No, Flink does not join keys before full values. That is very often very
>> inefficient, as it results effectively in two joins where one is typically
>> about as expensive as the original join.
>>
>> One can do "semi-join-reduction", in case the join filters out many
>> values (many elements from one side do not find a match in the other side).
>> If the join does not filter, this does not help either.
>>
>> Your code is a bit of a surprise. Especially, because in you solution
>> that worked, the first statement does nothing:
>>
>> DataSet<Tuple2<String, List<ThriftObj>>> subset =
>>
>> attrToExpand.project(0).joinWithHuge(bigDataset).where(0).equalTo(0).getInput2();
>>
>>
>> This builds a join, but then takes the second input of the join (the
>> bigDataset data set). Because the result of the join is never
>> actually used, it is never executed. The second statement hence is
>> effectively
>>
>> DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset =
>>
>> attrToExpand.join(bigDataset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);
>>
>>
>> Curious why this executed when the original did not.
>>
>> BTW: If the Lists are very long so they do not fit into a hashtable
>> memory partition, you can try to use a JoinHint to create a sort-merge
>> join. It may become slower, but typically works with even less memory.
>>
>>
>> Greetings,
>> Stephan
>>
>>
>> On Tue, Sep 8, 2015 at 9:59 AM, Flavio Pompermaier <pomperma...@okkam.it>
>> wrote:
>>
>>> Hi to all,
>>>
>>> I have a case where I don't understand why flink is not able to optimize
>>> the join between 2 datasets.
>>>
>>> My initial code was basically this:
>>>
>>> DataSet<Tuple2<String, List<ThriftObj>>> bigDataset = ...;//5.257.207
>>> elements
>>> DataSet<Tuple2<String,List<MyObject>>> attrToExpand =
>>> ...;//65.000 elements
>>>
>>> DataSet<Tuple2<String, IndexAttributeToExpand>> tmp =
>>>
>>> attrToExpand.joinWithHuge(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);
>>>
>>> This job wasn't able to complete on my local machine (from Eclipse)
>>> because Flink was giving me the following error:
>>>
>>> Hash join exceeded maximum number of recursions, without reducing
>>> partitions enough to be memory resident. Probably cause: Too many duplicate
>>> keys.
>>>
>>> This was because in attrToExpand the List<MyObject> could be quite big.
>>> Indeed, changing that code to the following make everything work like a
>>> charm:
>>>
>>> DataSet<Tuple2<String, List<ThriftObj>>> subset =
>>>
>>> attrToExpand.project(0).joinWithHuge(bigDataset).where(0).equalTo(0).getInput2();
>>>
>>> DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset =
>>>
>>> attrToExpand.join(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);
>>>
>>>
>>> Isn't something impossible for Flink to optimize my initial code into
>>> the second? I was convinced that Flink was performing a join only on the
>>> keys before grabbing also the other elements of the Tuples into memory..am
>>> I wrong?
>>>
>>> Best,
>>> Flavio
>>>
>>
>>
>
>

Reply via email to