Hi Robert,

thanks for opening the ticket.

Regarding injecting grouping or partitioning information, semantic
annotations (forward fields) [1] is probably what you are looking for.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/batch/index.html#semantic-annotations

2017-01-14 13:59 GMT+01:00 Robert Schmidtke <ro.schmid...@gmail.com>:

> Hi Fabian,
>
> I have opened a ticket for that, thanks.
>
> I have another question: now that I have obtained the proper local
> grouping, I did some aggregation of type [T] -> U, where one aggregated
> object is of type U, containing information of zero or more Ts. The Us are
> still tied to the hostname, and have the property hostname=hostX for the
> workers they're executed on, just like before. Is it possible to specify
> the grouping/partitioning for DataSets that are not DataSources, just like
> you suggested before? Because my guess is that the grouping information is
> lost when going from T to U.
>
> Best and thanks for the great help!
> Robert
>
> On Fri, Jan 13, 2017 at 8:54 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> I think so far getExecutionPlan() was only used for debugging purpose and
>> not in programs that would also be executed.
>> You can open a JIRA issue if you think that this would a valuable feature.
>>
>> Thanks, Fabian
>>
>> 2017-01-13 16:34 GMT+01:00 Robert Schmidtke <ro.schmid...@gmail.com>:
>>
>>> Just a side note, I'm guessing there's a bug here:
>>> https://github.com/apache/flink/blob/master/flink-clie
>>> nts/src/main/java/org/apache/flink/client/program/ContextEn
>>> vironment.java#L68
>>>
>>> It should say createProgramPlan("unnamed job", false);
>>>
>>> Otherwise I'm getting an exception complaining that no new sinks have
>>> been added after the last execution. So currently it is not possible for me
>>> to first get the execution plan and then run execute the program.
>>>
>>> Robert
>>>
>>> On Fri, Jan 13, 2017 at 3:14 PM, Robert Schmidtke <
>>> ro.schmid...@gmail.com> wrote:
>>>
>>>> Hi Fabian,
>>>>
>>>> thanks for the quick and comprehensive reply. I'll have a look at the
>>>> ExecutionPlan using your suggestion to check what actually gets computed,
>>>> and I'll use the properties as well. If I stumble across something else
>>>> I'll let you know.
>>>>
>>>> Many thanks again!
>>>> Robert
>>>>
>>>> On Fri, Jan 13, 2017 at 2:40 PM, Fabian Hueske <fhue...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Robert,
>>>>>
>>>>> let me first describe what splits, groups, and partitions are.
>>>>>
>>>>> * Partition: This is basically all data that goes through the same
>>>>> task instance. If you have an operator with a parallelism of 80, you have
>>>>> 80 partitions. When you call sortPartition() you'll have 80 sorted 
>>>>> streams,
>>>>> if you call mapPartition you iterate over all records in one partition.
>>>>> * Split: Splits are a concept of InputFormats. An InputFormat can
>>>>> process several splits. All splits that are processed by the same data
>>>>> source task make up the partition of that task. So a split is a subset of 
>>>>> a
>>>>> partition. In your case where each task reads exactly one split, the split
>>>>> is equivalent to the partition.
>>>>> * Group: A group is based on the groupBy attribute and hence
>>>>> data-driven and does not depend on the parallelism. A groupReduce requires
>>>>> a partitioning such that all records with the same grouping attribute are
>>>>> sent to the same operator, i.e., all are part of the same partition.
>>>>> Depending on the number of distinct grouping keys (and the hash-function) 
>>>>> a
>>>>> partition can have zero, one, or more groups.
>>>>>
>>>>> Now coming to your use case. You have 80 sources running on 5
>>>>> machines. All source on the same machine produce records with the same
>>>>> grouping key (hostname). You can actually give a hint to Flink, that the
>>>>> data returned by a split is partitioned, grouped, or sorted in a specific
>>>>> way. This works as follows:
>>>>>
>>>>> // String is hostname, Integer is parallel id of the source task
>>>>> DataSet<Tuple3<String, Integer, Long>> = env.createInput(YourFormat);
>>>>> SplitDataProperties<Tuple3<String, Integer, Long>> splitProps =
>>>>> ((DataSource)text).getSplitDataProperties();
>>>>> splitProps.splitsGroupedBy(0,1)
>>>>> splitProps.splitsPartitionedBy(0,1)
>>>>>
>>>>> With this info, Flink knows that the data returned by our source is
>>>>> partitioned and grouped. Now you can do groupBy(0,1).groupReduce(XXX) to
>>>>> run a local groupReduce operation on each of the 80 tasks (hostname and
>>>>> parallel index result in 80 keys) and locally reduce the data.
>>>>> Next step would be another .groupBy(0).groupReduce() which gives 16
>>>>> groups which are distributed across your tasks.
>>>>>
>>>>> However, you have to be careful with the SplitDataProperties. If you
>>>>> get them wrong, the optimizer makes false assumption and the resulting 
>>>>> plan
>>>>> might not compute what you are looking for.
>>>>> I'd recommend to read the JavaDocs and play a bit with this feature to
>>>>> see how it behaves. ExecutionEnvironment.getExecutionPlan() can help
>>>>> to figure out what is happening.
>>>>>
>>>>> Best,
>>>>> Fabian
>>>>>
>>>>>
>>>>> 2017-01-13 12:14 GMT+01:00 Robert Schmidtke <ro.schmid...@gmail.com>:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I'm having some trouble grasping what the meaning of/difference
>>>>>> between the following concepts is:
>>>>>>
>>>>>> - Split
>>>>>> - Group
>>>>>> - Partition
>>>>>>
>>>>>> Let me elaborate a bit on the problem I'm trying to solve here. In my
>>>>>> tests I'm using a 5-node cluster, on which I'm running Flink 1.1.3 in
>>>>>> standalone mode. Each node has 64G of memory and 32 cores. I'm starting 
>>>>>> the
>>>>>> JobManager on one node, and a TaskManager on each node. I'm assigning 16
>>>>>> slots to each TaskManager, so the overall parallelism is 80 (= 5 TMs x 16
>>>>>> Slots).
>>>>>>
>>>>>> The data I want to process resides in a local folder on each worker
>>>>>> with the same path (say /tmp/input). There can be arbitrarily many input
>>>>>> files in each worker's folder. I have written a custom input format that
>>>>>> round-robin assigns the files to each of the 16 local input splits (
>>>>>> https://github.com/robert-schmidtke/hdfs-statistics-adapter
>>>>>> /blob/master/sfs-analysis/src/main/java/de/zib/sfs/analysis/
>>>>>> io/SfsInputFormat.java) to obtain a total of 80 input splits that
>>>>>> need processing. Each split reads zero or more files, parsing the 
>>>>>> contents
>>>>>> into records that are emitted correctly. This works as expected.
>>>>>>
>>>>>> Now we're getting to the questions. How do these 80 input splits
>>>>>> relate to groups and partitions? My understanding of a partition is a
>>>>>> subset of my DataSet<X> that is local to each node. I.e. if I were to
>>>>>> repartition the data according to some scheme, a shuffling over workers
>>>>>> would occur. After reading all the data, I have 80 partitions, correct?
>>>>>>
>>>>>> What is less clear to me is the concept of a group, i.e. the result
>>>>>> of a groupBy operation. The input files I have are produced on each 
>>>>>> worker
>>>>>> by some other process. I first want to do pre-aggregation (I hope that's
>>>>>> the term) on each node before sending data over the network. The records
>>>>>> I'm processing contain a 'hostname' attribute, which is set to the 
>>>>>> worker's
>>>>>> hostname that processes the data, because the DataSources are local. That
>>>>>> means the records produced by the worker on host1 always contain the
>>>>>> attribute hostname=host1. Similar for the other 4 workers.
>>>>>>
>>>>>> Now what happens if I do a groupBy("hostname")? How do the workers
>>>>>> realize that no network transfer is necessary? Is a group a logical
>>>>>> abstraction, or a physical one (in my understanding a partition is 
>>>>>> physical
>>>>>> because it's local to exactly one worker).
>>>>>>
>>>>>> What I'd like to do next is a reduceGroup to merge multiple records
>>>>>> into one (some custom, yet straightforward, aggregation) and emit another
>>>>>> record for every couple of input records. Am I correct in assuming that 
>>>>>> the
>>>>>> Iterable<X> values passed to the reduce function all have the same 
>>>>>> hostname
>>>>>> value? That is, will the operation have a parallelism of 80, where 5x16
>>>>>> operations will have the same hostname value? Because I have 16 splits 
>>>>>> per
>>>>>> host, the 16 reduces on host1 should all receive values with
>>>>>> hostname=host1, correct? And after the operation has finished, will the
>>>>>> reduced groups (now actual DataSets again) still be local to the workers?
>>>>>>
>>>>>> This is quite a lot to work on I have to admit. I'm happy for any
>>>>>> hints, advice and feedback on this. If there's need for clarification I'd
>>>>>> be happy to provide more information.
>>>>>>
>>>>>> Thanks a lot in advance!
>>>>>>
>>>>>> Robert
>>>>>>
>>>>>> --
>>>>>> My GPG Key ID: 336E2680
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> My GPG Key ID: 336E2680
>>>>
>>>
>>>
>>>
>>> --
>>> My GPG Key ID: 336E2680
>>>
>>
>>
>
>
> --
> My GPG Key ID: 336E2680
>

Reply via email to