I think so far getExecutionPlan() was only used for debugging purpose and
not in programs that would also be executed.
You can open a JIRA issue if you think that this would a valuable feature.

Thanks, Fabian

2017-01-13 16:34 GMT+01:00 Robert Schmidtke <ro.schmid...@gmail.com>:

> Just a side note, I'm guessing there's a bug here: https://github.com/
> apache/flink/blob/master/flink-clients/src/main/java/
> org/apache/flink/client/program/ContextEnvironment.java#L68
>
> It should say createProgramPlan("unnamed job", false);
>
> Otherwise I'm getting an exception complaining that no new sinks have been
> added after the last execution. So currently it is not possible for me to
> first get the execution plan and then run execute the program.
>
> Robert
>
> On Fri, Jan 13, 2017 at 3:14 PM, Robert Schmidtke <ro.schmid...@gmail.com>
> wrote:
>
>> Hi Fabian,
>>
>> thanks for the quick and comprehensive reply. I'll have a look at the
>> ExecutionPlan using your suggestion to check what actually gets computed,
>> and I'll use the properties as well. If I stumble across something else
>> I'll let you know.
>>
>> Many thanks again!
>> Robert
>>
>> On Fri, Jan 13, 2017 at 2:40 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi Robert,
>>>
>>> let me first describe what splits, groups, and partitions are.
>>>
>>> * Partition: This is basically all data that goes through the same task
>>> instance. If you have an operator with a parallelism of 80, you have 80
>>> partitions. When you call sortPartition() you'll have 80 sorted streams, if
>>> you call mapPartition you iterate over all records in one partition.
>>> * Split: Splits are a concept of InputFormats. An InputFormat can
>>> process several splits. All splits that are processed by the same data
>>> source task make up the partition of that task. So a split is a subset of a
>>> partition. In your case where each task reads exactly one split, the split
>>> is equivalent to the partition.
>>> * Group: A group is based on the groupBy attribute and hence data-driven
>>> and does not depend on the parallelism. A groupReduce requires a
>>> partitioning such that all records with the same grouping attribute are
>>> sent to the same operator, i.e., all are part of the same partition.
>>> Depending on the number of distinct grouping keys (and the hash-function) a
>>> partition can have zero, one, or more groups.
>>>
>>> Now coming to your use case. You have 80 sources running on 5 machines.
>>> All source on the same machine produce records with the same grouping key
>>> (hostname). You can actually give a hint to Flink, that the data returned
>>> by a split is partitioned, grouped, or sorted in a specific way. This works
>>> as follows:
>>>
>>> // String is hostname, Integer is parallel id of the source task
>>> DataSet<Tuple3<String, Integer, Long>> = env.createInput(YourFormat);
>>> SplitDataProperties<Tuple3<String, Integer, Long>> splitProps =
>>> ((DataSource)text).getSplitDataProperties();
>>> splitProps.splitsGroupedBy(0,1)
>>> splitProps.splitsPartitionedBy(0,1)
>>>
>>> With this info, Flink knows that the data returned by our source is
>>> partitioned and grouped. Now you can do groupBy(0,1).groupReduce(XXX) to
>>> run a local groupReduce operation on each of the 80 tasks (hostname and
>>> parallel index result in 80 keys) and locally reduce the data.
>>> Next step would be another .groupBy(0).groupReduce() which gives 16
>>> groups which are distributed across your tasks.
>>>
>>> However, you have to be careful with the SplitDataProperties. If you get
>>> them wrong, the optimizer makes false assumption and the resulting plan
>>> might not compute what you are looking for.
>>> I'd recommend to read the JavaDocs and play a bit with this feature to
>>> see how it behaves. ExecutionEnvironment.getExecutionPlan() can help to
>>> figure out what is happening.
>>>
>>> Best,
>>> Fabian
>>>
>>>
>>> 2017-01-13 12:14 GMT+01:00 Robert Schmidtke <ro.schmid...@gmail.com>:
>>>
>>>> Hi all,
>>>>
>>>> I'm having some trouble grasping what the meaning of/difference between
>>>> the following concepts is:
>>>>
>>>> - Split
>>>> - Group
>>>> - Partition
>>>>
>>>> Let me elaborate a bit on the problem I'm trying to solve here. In my
>>>> tests I'm using a 5-node cluster, on which I'm running Flink 1.1.3 in
>>>> standalone mode. Each node has 64G of memory and 32 cores. I'm starting the
>>>> JobManager on one node, and a TaskManager on each node. I'm assigning 16
>>>> slots to each TaskManager, so the overall parallelism is 80 (= 5 TMs x 16
>>>> Slots).
>>>>
>>>> The data I want to process resides in a local folder on each worker
>>>> with the same path (say /tmp/input). There can be arbitrarily many input
>>>> files in each worker's folder. I have written a custom input format that
>>>> round-robin assigns the files to each of the 16 local input splits (
>>>> https://github.com/robert-schmidtke/hdfs-statistics-adapter
>>>> /blob/master/sfs-analysis/src/main/java/de/zib/sfs/analysis/
>>>> io/SfsInputFormat.java) to obtain a total of 80 input splits that need
>>>> processing. Each split reads zero or more files, parsing the contents into
>>>> records that are emitted correctly. This works as expected.
>>>>
>>>> Now we're getting to the questions. How do these 80 input splits relate
>>>> to groups and partitions? My understanding of a partition is a subset of my
>>>> DataSet<X> that is local to each node. I.e. if I were to repartition the
>>>> data according to some scheme, a shuffling over workers would occur. After
>>>> reading all the data, I have 80 partitions, correct?
>>>>
>>>> What is less clear to me is the concept of a group, i.e. the result of
>>>> a groupBy operation. The input files I have are produced on each worker by
>>>> some other process. I first want to do pre-aggregation (I hope that's the
>>>> term) on each node before sending data over the network. The records I'm
>>>> processing contain a 'hostname' attribute, which is set to the worker's
>>>> hostname that processes the data, because the DataSources are local. That
>>>> means the records produced by the worker on host1 always contain the
>>>> attribute hostname=host1. Similar for the other 4 workers.
>>>>
>>>> Now what happens if I do a groupBy("hostname")? How do the workers
>>>> realize that no network transfer is necessary? Is a group a logical
>>>> abstraction, or a physical one (in my understanding a partition is physical
>>>> because it's local to exactly one worker).
>>>>
>>>> What I'd like to do next is a reduceGroup to merge multiple records
>>>> into one (some custom, yet straightforward, aggregation) and emit another
>>>> record for every couple of input records. Am I correct in assuming that the
>>>> Iterable<X> values passed to the reduce function all have the same hostname
>>>> value? That is, will the operation have a parallelism of 80, where 5x16
>>>> operations will have the same hostname value? Because I have 16 splits per
>>>> host, the 16 reduces on host1 should all receive values with
>>>> hostname=host1, correct? And after the operation has finished, will the
>>>> reduced groups (now actual DataSets again) still be local to the workers?
>>>>
>>>> This is quite a lot to work on I have to admit. I'm happy for any
>>>> hints, advice and feedback on this. If there's need for clarification I'd
>>>> be happy to provide more information.
>>>>
>>>> Thanks a lot in advance!
>>>>
>>>> Robert
>>>>
>>>> --
>>>> My GPG Key ID: 336E2680
>>>>
>>>
>>>
>>
>>
>> --
>> My GPG Key ID: 336E2680
>>
>
>
>
> --
> My GPG Key ID: 336E2680
>

Reply via email to