Xiaowei Jiang created FLINK-4855:
------------------------------------

             Summary: Add partitionedKeyBy to DataStream
                 Key: FLINK-4855
                 URL: https://issues.apache.org/jira/browse/FLINK-4855
             Project: Flink
          Issue Type: Improvement
          Components: DataStream API
            Reporter: Xiaowei Jiang
            Assignee: MaGuowei


After we do any interesting operations (e.g. reduce) on KeyedStream, the result 
becomes DataStream. In a lot of cases, the output still has the same or 
compatible keys with the KeyedStream (logically). But to do further operations 
on these keys, we are forced to use keyby again. This works semantically, but 
is costly in two aspects. First, it destroys the possibility of chaining, which 
is one of the most important optimization technique. Second, keyby will greatly 
expand the connected components of tasks, which has implications in failover 
optimization.

To address this shortcoming, we propose a new operator partitionedKeyBy.

DataStream {
    public <K> KeyedStream<T, K> partitionedKeyBy(KeySelector<T, K> key)
}

Semantically, DataStream.partitionedKeyBy(key) is equivalent to 
DataStream.keyBy(partitionedKey) where partitionedKey is key plus the taskid as 
an extra field. This guarantees that records from different tasks will never 
produce the same keys.

With this, it's possible to do

ds.keyBy(key1).reduce(func1)
    .partitionedKeyBy(key1).reduce(func2)
    .partitionedKeyBy(key2).reduce(func3);

Most importantly, in certain cases, we will be able to chains these into a 
single vertex.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to