Yes, that is the way to do it.

This makes me think that it would be nice to have a method that builds the
union of a list of data sets.

DataSet<T> union(DataSet<T>... sets)

It would be implemented like in your loop. Would that be helpful?

Stephan


On Tue, Mar 17, 2015 at 6:03 PM, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> As always one minute after I sent the email I found the problem!
> It was that I should reassign the initial dataset:
>     ret = ret.union(sourceDs);
>
> Bye,
> Flavio
>
> On Tue, Mar 17, 2015 at 5:58 PM, Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
>> Hi Fabian,
>> I was trying to use the strategy you suggested with flink 0.8.1 but it
>> seems that the union of the datasets cannot be created programmatically
>> because the union operator gives a name to the generated dataset that is
>> the name of the calling function so that  only the first dataset is read.
>> My code looks like:
>>
>> private static DataSet<Tuple6<...> getSourceDs(ExecutionEnvironment env, 
>> final
>> String outputGraph, List<String> tableNames) {
>> DataSet<Tuple6<...>> ret = null;
>> for (String tableName : tableNames) {
>> DataSet<Tuple6<...>> sourceDs = env.createInput(new
>> MyTableInputFormat(tableName))
>>                         ....
>>
>> if(ret==null)
>> ret = sourceDs;
>> else
>> ret.union(sourceDs);
>>                }
>>               return ret;
>>        }
>>
>> Is this a bug or am I'm doing something wrong?
>> Thanks in advance,
>> Flavio
>>
>> On Mon, Dec 22, 2014 at 2:42 PM, <fhue...@gmail.com> wrote:
>>
>>>  Union is just combining data from multiple sources into a single
>>> dataset.
>>> That’s it. No memory, no disk involved.
>>>
>>> In you case you have
>>>
>>> input1.union(input2).groupBy(1).reduce(…)
>>>
>>> This will translate into:
>>>
>>> input1 -> repartition ->
>>>                                         read-both-inputs ->  sort ->
>>> reduce
>>> input2 -> repartition ->
>>>
>>> So, in your case not even additional network transfer is involved,
>>> because both data sets would need to be partitioned for the reduce anyway.
>>>
>>> Note, union in Flink has SQL union-all semantics, i.e., there is
>>> not removal of duplicates.
>>>
>>> Cheers, Fabian
>>>
>>> *From:* Flavio Pompermaier <pomperma...@okkam.it>
>>> *Sent:* ‎Monday‎, ‎22‎. ‎December‎, ‎2014 ‎14‎:‎32
>>> *To:* u...@flink.incubator.apache.org
>>>
>>> Ok thanks Fabian. I'd like just to know the internals of the union of
>>> multiple datasets (partitioning, distribution among server, memory/disk,
>>> etc..). Do you have any ref to this?
>>>
>>> Thanks in advance,
>>> Flavio
>>>
>>> On Mon, Dec 22, 2014 at 12:46 PM, Fabian Hueske <fhue...@apache.org>
>>> wrote:
>>>
>>>> Follow the first approach.
>>>> Joins are expensive, union comes for free.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2014-12-22 11:47 GMT+01:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>>>
>>>>> Hi guys,
>>>>>
>>>>> In my use case I have multiple Datasets with the same structure (e.g.
>>>>> Tuple3) and I want to produce an output Dataset containing all Tuple3
>>>>> grouped by the first field (0).
>>>>> I can obtain the same results performing a union of all datasets and
>>>>> then a group by (simplest implementation) or join all of them pairwise
>>>>> (((A->B)->C)->D)..) or I don't know if there is any other solution. When
>>>>> should I use the first or the second approach? Could you help me in
>>>>> figuring out the internals of the two approaches? I always have some fear
>>>>> when using multiple joins when I don't know exactly their size..
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>
>>>>
>>>
>

Reply via email to