thanks Biao, I see. To achieve what I want to do I need to work with KeyedStream. I downloaded the Flink source code to learn and alter the KeyedStream to my needs. I am not sure but it is a lot of work because as far as I understood the key-groups have to be predictable [1]. and altering this touches a lot of other parts of the source code.
However, If I guarantee that they (key-groups) are predictable, I will be able to rebalance, rescale, .... the keys to other worker-nodes. [1] https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html Thanks, Felipe *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com <https://felipeogutierrez.blogspot.com>* On Mon, Sep 23, 2019 at 9:51 AM Biao Liu <[email protected]> wrote: > Hi Felipe, > > Flink job graph is DAG based. It seems that you set an "edge property" > (partitioner) several times. > Flink does not support multiple partitioners on one edge. The later one > overrides the priors. That means the "keyBy" overrides the "rebalance" and > "partitionByPartial". > > You could insert some nodes between these partitioners to satisfy your > requirement. For example, > `sourceDataStream.rebalance().map(...).keyby(0).sum(1).print();` > > Thanks, > Biao /'bɪ.aʊ/ > > > > On Thu, 19 Sep 2019 at 16:49, Felipe Gutierrez < > [email protected]> wrote: > >> I am executing a data stream application which uses rebalance. Basically >> I am counting words using "src -> split -> physicalPartitionStrategy -> >> keyBy -> sum -> print". I am running 3 examples, one without physical >> partition strategy, one with rebalance strategy [1], and one with >> partial partition strategy from [2]. >> I know that the keyBy operator actually kills what rebalance is doing >> because it splits the stream by key and if I have a stream with skewed key, >> one parallel instance of the operator after the keyBy will be overloaded. >> However, I was expecting that *before the keyBy* I would have a balanced >> stream, which is not happening. >> >> Basically, I want to see the difference in records/sec between operators >> when I use rebalance or any other physical partition strategy. However, >> when I found no difference in the records/sec metrics of all operators when >> I am running 3 different physical partition strategies. Screenshots of >> Prometheus+Grafana are attached. >> >> Maybe I am measuring the wrong operator, or maybe I am not using the >> rebalance in the right way, or I am not doing a good use case to test the >> rebalance transformation. >> I am also testing a different physical partition to later try to >> implement the issue "FLINK-1725 New Partitioner for better load balancing >> for skewed data" [2]. I am not sure, but I guess that all physical >> partition strategies have to be implemented on a KeyedStream. >> >> DataStream<String> text = env.addSource(new WordSource()); >> // split lines in strings >> DataStream<Tuple2<String, Integer>> tokenizer = text.flatMap(new >> Tokenizer()); >> // choose a partitioning strategy >> DataStream<Tuple2<String, Integer>> partitionedStream = tokenizer); >> DataStream<Tuple2<String, Integer>> partitionedStream = >> tokenizer.rebalance(); >> DataStream<Tuple2<String, Integer>> partitionedStream = >> tokenizer.partitionByPartial(0); >> // count >> partitionedStream.keyBy(0).sum(1).print(); >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning >> [2] https://issues.apache.org/jira/browse/FLINK-1725 >> >> thanks, >> Felipe >> >> *--* >> *-- Felipe Gutierrez* >> >> *-- skype: felipe.o.gutierrez* >> *--* *https://felipeogutierrez.blogspot.com >> <https://felipeogutierrez.blogspot.com>* >> >
