I think so far getExecutionPlan() was only used for debugging purpose and not in programs that would also be executed. You can open a JIRA issue if you think that this would a valuable feature.
Thanks, Fabian 2017-01-13 16:34 GMT+01:00 Robert Schmidtke <ro.schmid...@gmail.com>: > Just a side note, I'm guessing there's a bug here: https://github.com/ > apache/flink/blob/master/flink-clients/src/main/java/ > org/apache/flink/client/program/ContextEnvironment.java#L68 > > It should say createProgramPlan("unnamed job", false); > > Otherwise I'm getting an exception complaining that no new sinks have been > added after the last execution. So currently it is not possible for me to > first get the execution plan and then run execute the program. > > Robert > > On Fri, Jan 13, 2017 at 3:14 PM, Robert Schmidtke <ro.schmid...@gmail.com> > wrote: > >> Hi Fabian, >> >> thanks for the quick and comprehensive reply. I'll have a look at the >> ExecutionPlan using your suggestion to check what actually gets computed, >> and I'll use the properties as well. If I stumble across something else >> I'll let you know. >> >> Many thanks again! >> Robert >> >> On Fri, Jan 13, 2017 at 2:40 PM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi Robert, >>> >>> let me first describe what splits, groups, and partitions are. >>> >>> * Partition: This is basically all data that goes through the same task >>> instance. If you have an operator with a parallelism of 80, you have 80 >>> partitions. When you call sortPartition() you'll have 80 sorted streams, if >>> you call mapPartition you iterate over all records in one partition. >>> * Split: Splits are a concept of InputFormats. An InputFormat can >>> process several splits. All splits that are processed by the same data >>> source task make up the partition of that task. So a split is a subset of a >>> partition. In your case where each task reads exactly one split, the split >>> is equivalent to the partition. >>> * Group: A group is based on the groupBy attribute and hence data-driven >>> and does not depend on the parallelism. A groupReduce requires a >>> partitioning such that all records with the same grouping attribute are >>> sent to the same operator, i.e., all are part of the same partition. >>> Depending on the number of distinct grouping keys (and the hash-function) a >>> partition can have zero, one, or more groups. >>> >>> Now coming to your use case. You have 80 sources running on 5 machines. >>> All source on the same machine produce records with the same grouping key >>> (hostname). You can actually give a hint to Flink, that the data returned >>> by a split is partitioned, grouped, or sorted in a specific way. This works >>> as follows: >>> >>> // String is hostname, Integer is parallel id of the source task >>> DataSet<Tuple3<String, Integer, Long>> = env.createInput(YourFormat); >>> SplitDataProperties<Tuple3<String, Integer, Long>> splitProps = >>> ((DataSource)text).getSplitDataProperties(); >>> splitProps.splitsGroupedBy(0,1) >>> splitProps.splitsPartitionedBy(0,1) >>> >>> With this info, Flink knows that the data returned by our source is >>> partitioned and grouped. Now you can do groupBy(0,1).groupReduce(XXX) to >>> run a local groupReduce operation on each of the 80 tasks (hostname and >>> parallel index result in 80 keys) and locally reduce the data. >>> Next step would be another .groupBy(0).groupReduce() which gives 16 >>> groups which are distributed across your tasks. >>> >>> However, you have to be careful with the SplitDataProperties. If you get >>> them wrong, the optimizer makes false assumption and the resulting plan >>> might not compute what you are looking for. >>> I'd recommend to read the JavaDocs and play a bit with this feature to >>> see how it behaves. ExecutionEnvironment.getExecutionPlan() can help to >>> figure out what is happening. >>> >>> Best, >>> Fabian >>> >>> >>> 2017-01-13 12:14 GMT+01:00 Robert Schmidtke <ro.schmid...@gmail.com>: >>> >>>> Hi all, >>>> >>>> I'm having some trouble grasping what the meaning of/difference between >>>> the following concepts is: >>>> >>>> - Split >>>> - Group >>>> - Partition >>>> >>>> Let me elaborate a bit on the problem I'm trying to solve here. In my >>>> tests I'm using a 5-node cluster, on which I'm running Flink 1.1.3 in >>>> standalone mode. Each node has 64G of memory and 32 cores. I'm starting the >>>> JobManager on one node, and a TaskManager on each node. I'm assigning 16 >>>> slots to each TaskManager, so the overall parallelism is 80 (= 5 TMs x 16 >>>> Slots). >>>> >>>> The data I want to process resides in a local folder on each worker >>>> with the same path (say /tmp/input). There can be arbitrarily many input >>>> files in each worker's folder. I have written a custom input format that >>>> round-robin assigns the files to each of the 16 local input splits ( >>>> https://github.com/robert-schmidtke/hdfs-statistics-adapter >>>> /blob/master/sfs-analysis/src/main/java/de/zib/sfs/analysis/ >>>> io/SfsInputFormat.java) to obtain a total of 80 input splits that need >>>> processing. Each split reads zero or more files, parsing the contents into >>>> records that are emitted correctly. This works as expected. >>>> >>>> Now we're getting to the questions. How do these 80 input splits relate >>>> to groups and partitions? My understanding of a partition is a subset of my >>>> DataSet<X> that is local to each node. I.e. if I were to repartition the >>>> data according to some scheme, a shuffling over workers would occur. After >>>> reading all the data, I have 80 partitions, correct? >>>> >>>> What is less clear to me is the concept of a group, i.e. the result of >>>> a groupBy operation. The input files I have are produced on each worker by >>>> some other process. I first want to do pre-aggregation (I hope that's the >>>> term) on each node before sending data over the network. The records I'm >>>> processing contain a 'hostname' attribute, which is set to the worker's >>>> hostname that processes the data, because the DataSources are local. That >>>> means the records produced by the worker on host1 always contain the >>>> attribute hostname=host1. Similar for the other 4 workers. >>>> >>>> Now what happens if I do a groupBy("hostname")? How do the workers >>>> realize that no network transfer is necessary? Is a group a logical >>>> abstraction, or a physical one (in my understanding a partition is physical >>>> because it's local to exactly one worker). >>>> >>>> What I'd like to do next is a reduceGroup to merge multiple records >>>> into one (some custom, yet straightforward, aggregation) and emit another >>>> record for every couple of input records. Am I correct in assuming that the >>>> Iterable<X> values passed to the reduce function all have the same hostname >>>> value? That is, will the operation have a parallelism of 80, where 5x16 >>>> operations will have the same hostname value? Because I have 16 splits per >>>> host, the 16 reduces on host1 should all receive values with >>>> hostname=host1, correct? And after the operation has finished, will the >>>> reduced groups (now actual DataSets again) still be local to the workers? >>>> >>>> This is quite a lot to work on I have to admit. I'm happy for any >>>> hints, advice and feedback on this. If there's need for clarification I'd >>>> be happy to provide more information. >>>> >>>> Thanks a lot in advance! >>>> >>>> Robert >>>> >>>> -- >>>> My GPG Key ID: 336E2680 >>>> >>> >>> >> >> >> -- >> My GPG Key ID: 336E2680 >> > > > > -- > My GPG Key ID: 336E2680 >