Hi Felipe, If I understand correctly, you want to solve data skew caused by imbalanced key?
There is a common strategy to solve this kind of problem, pre-aggregation. Like combiner of MapReduce. But sadly, AFAIK Flink does not support pre-aggregation currently. I'm afraid you have to implement it by yourself. For example, introducing a function caching some data (time or count based). This function should be before "keyby". And it's on a non-keyed stream. It does pre-aggregation just like what the aggregation after "keyby" does. In this way, the skewed keyed data would be reduced a lot. I also found a suggestion [1] from Fabian, although it's long time ago. Hope it helps. 1. https://stackoverflow.com/questions/47825565/apache-flink-how-can-i-compute-windows-with-local-pre-aggregation Thanks, Biao /'bɪ.aʊ/ On Mon, 23 Sep 2019 at 19:51, Felipe Gutierrez <[email protected]> wrote: > thanks Biao, > > I see. To achieve what I want to do I need to work with KeyedStream. I > downloaded the Flink source code to learn and alter the KeyedStream to my > needs. I am not sure but it is a lot of work because as far as I understood > the key-groups have to be predictable [1]. and altering this touches a lot > of other parts of the source code. > > However, If I guarantee that they (key-groups) are predictable, I will be > able to rebalance, rescale, .... the keys to other worker-nodes. > > [1] > https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html > > Thanks, > Felipe > *--* > *-- Felipe Gutierrez* > > *-- skype: felipe.o.gutierrez* > *--* *https://felipeogutierrez.blogspot.com > <https://felipeogutierrez.blogspot.com>* > > > On Mon, Sep 23, 2019 at 9:51 AM Biao Liu <[email protected]> wrote: > >> Hi Felipe, >> >> Flink job graph is DAG based. It seems that you set an "edge property" >> (partitioner) several times. >> Flink does not support multiple partitioners on one edge. The later one >> overrides the priors. That means the "keyBy" overrides the "rebalance" and >> "partitionByPartial". >> >> You could insert some nodes between these partitioners to satisfy your >> requirement. For example, >> `sourceDataStream.rebalance().map(...).keyby(0).sum(1).print();` >> >> Thanks, >> Biao /'bɪ.aʊ/ >> >> >> >> On Thu, 19 Sep 2019 at 16:49, Felipe Gutierrez < >> [email protected]> wrote: >> >>> I am executing a data stream application which uses rebalance. Basically >>> I am counting words using "src -> split -> physicalPartitionStrategy -> >>> keyBy -> sum -> print". I am running 3 examples, one without physical >>> partition strategy, one with rebalance strategy [1], and one with >>> partial partition strategy from [2]. >>> I know that the keyBy operator actually kills what rebalance is doing >>> because it splits the stream by key and if I have a stream with skewed key, >>> one parallel instance of the operator after the keyBy will be overloaded. >>> However, I was expecting that *before the keyBy* I would have a >>> balanced stream, which is not happening. >>> >>> Basically, I want to see the difference in records/sec between operators >>> when I use rebalance or any other physical partition strategy. However, >>> when I found no difference in the records/sec metrics of all operators when >>> I am running 3 different physical partition strategies. Screenshots of >>> Prometheus+Grafana are attached. >>> >>> Maybe I am measuring the wrong operator, or maybe I am not using the >>> rebalance in the right way, or I am not doing a good use case to test the >>> rebalance transformation. >>> I am also testing a different physical partition to later try to >>> implement the issue "FLINK-1725 New Partitioner for better load balancing >>> for skewed data" [2]. I am not sure, but I guess that all physical >>> partition strategies have to be implemented on a KeyedStream. >>> >>> DataStream<String> text = env.addSource(new WordSource()); >>> // split lines in strings >>> DataStream<Tuple2<String, Integer>> tokenizer = text.flatMap(new >>> Tokenizer()); >>> // choose a partitioning strategy >>> DataStream<Tuple2<String, Integer>> partitionedStream = tokenizer); >>> DataStream<Tuple2<String, Integer>> partitionedStream = >>> tokenizer.rebalance(); >>> DataStream<Tuple2<String, Integer>> partitionedStream = >>> tokenizer.partitionByPartial(0); >>> // count >>> partitionedStream.keyBy(0).sum(1).print(); >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning >>> [2] https://issues.apache.org/jira/browse/FLINK-1725 >>> >>> thanks, >>> Felipe >>> >>> *--* >>> *-- Felipe Gutierrez* >>> >>> *-- skype: felipe.o.gutierrez* >>> *--* *https://felipeogutierrez.blogspot.com >>> <https://felipeogutierrez.blogspot.com>* >>> >>
