Hi Komal, Actually, the main factor about choosing the type of the partition depends on your business logic. If you want to do some aggregation logic based on a group. You must choose KeyBy to guarantee the correctness semantics.
Best, Vino Komal Mariam <komal.mar...@gmail.com> 于2019年12月9日周一 下午5:07写道: > Thank you @vino yang <yanghua1...@gmail.com> for the reply. I suspect > keyBy will beneficial in those cases where my subsequent operators are > computationally intensive. Their computation time being > than network > reshuffling cost. > > Regards, > Komal > > On Mon, 9 Dec 2019 at 15:23, vino yang <yanghua1...@gmail.com> wrote: > >> Hi Komal, >> >> KeyBy(Hash Partition, logically partition) and rebalance(physical >> partition) are both one of the partitions been supported by Flink.[1] >> >> Generally speaking, partitioning may cause network communication(network >> shuffles) costs which may cause more time cost. The example provided by you >> may be benefit from operator chain[2] if you remove the keyBy operation. >> >> Best, >> Vino >> >> [1]: >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#datastream-transformations >> [2]: >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#tasks-and-operator-chains >> >> Komal Mariam <komal.mar...@gmail.com> 于2019年12月9日周一 上午9:11写道: >> >>> Anyone? >>> >>> On Fri, 6 Dec 2019 at 19:07, Komal Mariam <komal.mar...@gmail.com> >>> wrote: >>> >>>> Hello everyone, >>>> >>>> I want to get some insights on the KeyBy (and Rebalance) operations as >>>> according to my understanding they partition our tasks over the defined >>>> parallelism and thus should make our pipeline faster. >>>> >>>> I am reading a topic which contains 170,000,000 pre-stored records with >>>> 11 Kafka partitions and replication factor of 1. Hence I use >>>> .setStartFromEarliest() to read the stream. >>>> My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores >>>> and 1 job manager with 6 cores. (10 task slots per TM hence I set >>>> environment parallelism to 30). >>>> >>>> There are about 10,000 object IDs hence 10,000 keys. Right now I'm >>>> keeping the number of records fixed to get a handle on how fast they're >>>> being processed. >>>> >>>> When I remove keyBy, I get the same results in 39 secs as opposed to 52 >>>> secs with KeyBy. Infact, even when I vary the parallelism down to 10 or >>>> below I still get the same extra overhead of 9 to 13secs. My data is mostly >>>> uniformly distributed on it's key so I can rule out skew. Rebalance >>>> likewise has the same latency as keyBy. >>>> >>>> What I want to know is what may be causing this overhead? And is there >>>> any way to decrease it? >>>> >>>> Here's the script I'm running for testing purposes: >>>> -------------- >>>> DataStream JSONStream = env.addSource(new FlinkKafkaConsumer<>("data", >>>> new >>>> JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest()) >>>> >>>> DataStream<Point> myPoints = JSONStream.map(new jsonToPoint()); >>>> >>>> mypoints.keyBy("oID").filter(new findDistancefromPOI()); >>>> >>>> public class findDistancefromPOI extends RichFilterFunction<Point> { >>>> public boolean filter(Point input) throws Exception { >>>> Double distance = computeEuclideanDist( >>>> 16.4199 , 89.974 ,input.X(),input.Y); >>>> return distance > 0; >>>> } >>>> } >>>> >>>> Best Regards, >>>> Komal >>>> >>>