Hi Robert, thanks for opening the ticket.
Regarding injecting grouping or partitioning information, semantic annotations (forward fields) [1] is probably what you are looking for. Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/batch/index.html#semantic-annotations 2017-01-14 13:59 GMT+01:00 Robert Schmidtke <ro.schmid...@gmail.com>: > Hi Fabian, > > I have opened a ticket for that, thanks. > > I have another question: now that I have obtained the proper local > grouping, I did some aggregation of type [T] -> U, where one aggregated > object is of type U, containing information of zero or more Ts. The Us are > still tied to the hostname, and have the property hostname=hostX for the > workers they're executed on, just like before. Is it possible to specify > the grouping/partitioning for DataSets that are not DataSources, just like > you suggested before? Because my guess is that the grouping information is > lost when going from T to U. > > Best and thanks for the great help! > Robert > > On Fri, Jan 13, 2017 at 8:54 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> I think so far getExecutionPlan() was only used for debugging purpose and >> not in programs that would also be executed. >> You can open a JIRA issue if you think that this would a valuable feature. >> >> Thanks, Fabian >> >> 2017-01-13 16:34 GMT+01:00 Robert Schmidtke <ro.schmid...@gmail.com>: >> >>> Just a side note, I'm guessing there's a bug here: >>> https://github.com/apache/flink/blob/master/flink-clie >>> nts/src/main/java/org/apache/flink/client/program/ContextEn >>> vironment.java#L68 >>> >>> It should say createProgramPlan("unnamed job", false); >>> >>> Otherwise I'm getting an exception complaining that no new sinks have >>> been added after the last execution. So currently it is not possible for me >>> to first get the execution plan and then run execute the program. >>> >>> Robert >>> >>> On Fri, Jan 13, 2017 at 3:14 PM, Robert Schmidtke < >>> ro.schmid...@gmail.com> wrote: >>> >>>> Hi Fabian, >>>> >>>> thanks for the quick and comprehensive reply. I'll have a look at the >>>> ExecutionPlan using your suggestion to check what actually gets computed, >>>> and I'll use the properties as well. If I stumble across something else >>>> I'll let you know. >>>> >>>> Many thanks again! >>>> Robert >>>> >>>> On Fri, Jan 13, 2017 at 2:40 PM, Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> Hi Robert, >>>>> >>>>> let me first describe what splits, groups, and partitions are. >>>>> >>>>> * Partition: This is basically all data that goes through the same >>>>> task instance. If you have an operator with a parallelism of 80, you have >>>>> 80 partitions. When you call sortPartition() you'll have 80 sorted >>>>> streams, >>>>> if you call mapPartition you iterate over all records in one partition. >>>>> * Split: Splits are a concept of InputFormats. An InputFormat can >>>>> process several splits. All splits that are processed by the same data >>>>> source task make up the partition of that task. So a split is a subset of >>>>> a >>>>> partition. In your case where each task reads exactly one split, the split >>>>> is equivalent to the partition. >>>>> * Group: A group is based on the groupBy attribute and hence >>>>> data-driven and does not depend on the parallelism. A groupReduce requires >>>>> a partitioning such that all records with the same grouping attribute are >>>>> sent to the same operator, i.e., all are part of the same partition. >>>>> Depending on the number of distinct grouping keys (and the hash-function) >>>>> a >>>>> partition can have zero, one, or more groups. >>>>> >>>>> Now coming to your use case. You have 80 sources running on 5 >>>>> machines. All source on the same machine produce records with the same >>>>> grouping key (hostname). You can actually give a hint to Flink, that the >>>>> data returned by a split is partitioned, grouped, or sorted in a specific >>>>> way. This works as follows: >>>>> >>>>> // String is hostname, Integer is parallel id of the source task >>>>> DataSet<Tuple3<String, Integer, Long>> = env.createInput(YourFormat); >>>>> SplitDataProperties<Tuple3<String, Integer, Long>> splitProps = >>>>> ((DataSource)text).getSplitDataProperties(); >>>>> splitProps.splitsGroupedBy(0,1) >>>>> splitProps.splitsPartitionedBy(0,1) >>>>> >>>>> With this info, Flink knows that the data returned by our source is >>>>> partitioned and grouped. Now you can do groupBy(0,1).groupReduce(XXX) to >>>>> run a local groupReduce operation on each of the 80 tasks (hostname and >>>>> parallel index result in 80 keys) and locally reduce the data. >>>>> Next step would be another .groupBy(0).groupReduce() which gives 16 >>>>> groups which are distributed across your tasks. >>>>> >>>>> However, you have to be careful with the SplitDataProperties. If you >>>>> get them wrong, the optimizer makes false assumption and the resulting >>>>> plan >>>>> might not compute what you are looking for. >>>>> I'd recommend to read the JavaDocs and play a bit with this feature to >>>>> see how it behaves. ExecutionEnvironment.getExecutionPlan() can help >>>>> to figure out what is happening. >>>>> >>>>> Best, >>>>> Fabian >>>>> >>>>> >>>>> 2017-01-13 12:14 GMT+01:00 Robert Schmidtke <ro.schmid...@gmail.com>: >>>>> >>>>>> Hi all, >>>>>> >>>>>> I'm having some trouble grasping what the meaning of/difference >>>>>> between the following concepts is: >>>>>> >>>>>> - Split >>>>>> - Group >>>>>> - Partition >>>>>> >>>>>> Let me elaborate a bit on the problem I'm trying to solve here. In my >>>>>> tests I'm using a 5-node cluster, on which I'm running Flink 1.1.3 in >>>>>> standalone mode. Each node has 64G of memory and 32 cores. I'm starting >>>>>> the >>>>>> JobManager on one node, and a TaskManager on each node. I'm assigning 16 >>>>>> slots to each TaskManager, so the overall parallelism is 80 (= 5 TMs x 16 >>>>>> Slots). >>>>>> >>>>>> The data I want to process resides in a local folder on each worker >>>>>> with the same path (say /tmp/input). There can be arbitrarily many input >>>>>> files in each worker's folder. I have written a custom input format that >>>>>> round-robin assigns the files to each of the 16 local input splits ( >>>>>> https://github.com/robert-schmidtke/hdfs-statistics-adapter >>>>>> /blob/master/sfs-analysis/src/main/java/de/zib/sfs/analysis/ >>>>>> io/SfsInputFormat.java) to obtain a total of 80 input splits that >>>>>> need processing. Each split reads zero or more files, parsing the >>>>>> contents >>>>>> into records that are emitted correctly. This works as expected. >>>>>> >>>>>> Now we're getting to the questions. How do these 80 input splits >>>>>> relate to groups and partitions? My understanding of a partition is a >>>>>> subset of my DataSet<X> that is local to each node. I.e. if I were to >>>>>> repartition the data according to some scheme, a shuffling over workers >>>>>> would occur. After reading all the data, I have 80 partitions, correct? >>>>>> >>>>>> What is less clear to me is the concept of a group, i.e. the result >>>>>> of a groupBy operation. The input files I have are produced on each >>>>>> worker >>>>>> by some other process. I first want to do pre-aggregation (I hope that's >>>>>> the term) on each node before sending data over the network. The records >>>>>> I'm processing contain a 'hostname' attribute, which is set to the >>>>>> worker's >>>>>> hostname that processes the data, because the DataSources are local. That >>>>>> means the records produced by the worker on host1 always contain the >>>>>> attribute hostname=host1. Similar for the other 4 workers. >>>>>> >>>>>> Now what happens if I do a groupBy("hostname")? How do the workers >>>>>> realize that no network transfer is necessary? Is a group a logical >>>>>> abstraction, or a physical one (in my understanding a partition is >>>>>> physical >>>>>> because it's local to exactly one worker). >>>>>> >>>>>> What I'd like to do next is a reduceGroup to merge multiple records >>>>>> into one (some custom, yet straightforward, aggregation) and emit another >>>>>> record for every couple of input records. Am I correct in assuming that >>>>>> the >>>>>> Iterable<X> values passed to the reduce function all have the same >>>>>> hostname >>>>>> value? That is, will the operation have a parallelism of 80, where 5x16 >>>>>> operations will have the same hostname value? Because I have 16 splits >>>>>> per >>>>>> host, the 16 reduces on host1 should all receive values with >>>>>> hostname=host1, correct? And after the operation has finished, will the >>>>>> reduced groups (now actual DataSets again) still be local to the workers? >>>>>> >>>>>> This is quite a lot to work on I have to admit. I'm happy for any >>>>>> hints, advice and feedback on this. If there's need for clarification I'd >>>>>> be happy to provide more information. >>>>>> >>>>>> Thanks a lot in advance! >>>>>> >>>>>> Robert >>>>>> >>>>>> -- >>>>>> My GPG Key ID: 336E2680 >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> My GPG Key ID: 336E2680 >>>> >>> >>> >>> >>> -- >>> My GPG Key ID: 336E2680 >>> >> >> > > > -- > My GPG Key ID: 336E2680 >