Hi Xiaowei, I like the idea to reuse a partitioning and thus saving a shuffle operation. It would be great if we could fail at runtime in case the partitioning changed somehow. That way a logical user failure won't go unnoticed.
Would it make sense to name the method partitionedByKey(...) because the data is already partitioned? Cheers, Till On Thu, Oct 20, 2016 at 9:53 AM, Xiaowei Jiang <xiaow...@gmail.com> wrote: > After we do any interesting operations (e.g. reduce) on KeyedStream, the > result becomes DataStream. In a lot of cases, the output still has the same > or compatible keys with the KeyedStream (logically). But to do further > operations on these keys, we are forced to use keyby again. This works > semantically, but is costly in two aspects. First, it destroys the > possibility of chaining, which is one of the most important optimization > technique. Second, keyby will greatly expand the connected components of > tasks, which has implications in failover optimization. > > To address this shortcoming, we propose a new operator partitionedKeyBy. > > DataStream { > public <K> KeyedStream<T, K> partitionedKeyBy(KeySelector<T, K> key) > } > > Semantically, DataStream.partitionedKeyBy(key) is equivalent to > DataStream.keyBy(partitionedKey) where partitionedKey is key plus the > taskid as an extra field. This guarantees that records from different tasks > will never produce the same keys. > > With this, it's possible to do > > ds.keyBy(key1).reduce(func1) > .partitionedKeyBy(key1).reduce(func2) > .partitionedKeyBy(key2).reduce(func3); > > Most importantly, in certain cases, we will be able to chains these into a > single vertex. > > Please share your thoughts. The JIRA is at https://issues.apache.org/j > ira/browse/FLINK-4855 > > Xiaowei >