Thank you so much for the detailed reply. I understand the usage for keyBy a lot better now. You are correct about the time variation too. We will apply different network settings and extend our datasets to check performance on different use cases.
On Mon, 9 Dec 2019 at 20:45, Arvid Heise <ar...@ververica.com> wrote: > Hi Komal, > > as a general rule of thumb, you want to avoid network shuffles as much as > possible. As vino pointed out, you need to reshuffle, if you need to group > by key. Another frequent usecase is for a rebalancing of data in case of a > heavy skew. Since neither applies to you, removing the keyby is the best > option. > > If you want to retain it, because you may experience skew in the future, > there are only a couple of things you can do. You may tinker with > networking settings to have smaller/larger network buffers (smaller = less > latency, larger = more throughput) [1]. Of course, you get better results > if you have a faster network (running in the cloud, you can play around > with different adapters). Also you could try if less/more machines are > actually faster (less machines = less network traffic, more machines = more > compute power). > > In any case, your data volume is so low that I would probably not optimize > too much. We are talking about seconds and the times may vary largely from > run to run, because of the low data volume. If you want to test the > throughput as a POC for a larger volume, I'd either generate a larger > sample or replicate it to get more reliable numbers. In any case, try to > have your final use case in mind when deciding for an option. > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#configuring-the-network-buffers > > On Mon, Dec 9, 2019 at 10:25 AM vino yang <yanghua1...@gmail.com> wrote: > >> Hi Komal, >> >> Actually, the main factor about choosing the type of the partition >> depends on your business logic. If you want to do some aggregation logic >> based on a group. You must choose KeyBy to guarantee the correctness >> semantics. >> >> Best, >> Vino >> >> Komal Mariam <komal.mar...@gmail.com> 于2019年12月9日周一 下午5:07写道: >> >>> Thank you @vino yang <yanghua1...@gmail.com> for the reply. I suspect >>> keyBy will beneficial in those cases where my subsequent operators are >>> computationally intensive. Their computation time being > than network >>> reshuffling cost. >>> >>> Regards, >>> Komal >>> >>> On Mon, 9 Dec 2019 at 15:23, vino yang <yanghua1...@gmail.com> wrote: >>> >>>> Hi Komal, >>>> >>>> KeyBy(Hash Partition, logically partition) and rebalance(physical >>>> partition) are both one of the partitions been supported by Flink.[1] >>>> >>>> Generally speaking, partitioning may cause network >>>> communication(network shuffles) costs which may cause more time cost. The >>>> example provided by you may be benefit from operator chain[2] if you remove >>>> the keyBy operation. >>>> >>>> Best, >>>> Vino >>>> >>>> [1]: >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#datastream-transformations >>>> [2]: >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#tasks-and-operator-chains >>>> >>>> Komal Mariam <komal.mar...@gmail.com> 于2019年12月9日周一 上午9:11写道: >>>> >>>>> Anyone? >>>>> >>>>> On Fri, 6 Dec 2019 at 19:07, Komal Mariam <komal.mar...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hello everyone, >>>>>> >>>>>> I want to get some insights on the KeyBy (and Rebalance) operations >>>>>> as according to my understanding they partition our tasks over the >>>>>> defined >>>>>> parallelism and thus should make our pipeline faster. >>>>>> >>>>>> I am reading a topic which contains 170,000,000 pre-stored records >>>>>> with 11 Kafka partitions and replication factor of 1. Hence I use >>>>>> .setStartFromEarliest() to read the stream. >>>>>> My Flink is a 4 node cluster with 3 taskmanagers, each having 10 >>>>>> cores and 1 job manager with 6 cores. (10 task slots per TM hence I set >>>>>> environment parallelism to 30). >>>>>> >>>>>> There are about 10,000 object IDs hence 10,000 keys. Right now I'm >>>>>> keeping the number of records fixed to get a handle on how fast they're >>>>>> being processed. >>>>>> >>>>>> When I remove keyBy, I get the same results in 39 secs as opposed to >>>>>> 52 secs with KeyBy. Infact, even when I vary the parallelism down to 10 >>>>>> or >>>>>> below I still get the same extra overhead of 9 to 13secs. My data is >>>>>> mostly >>>>>> uniformly distributed on it's key so I can rule out skew. Rebalance >>>>>> likewise has the same latency as keyBy. >>>>>> >>>>>> What I want to know is what may be causing this overhead? And is >>>>>> there any way to decrease it? >>>>>> >>>>>> Here's the script I'm running for testing purposes: >>>>>> -------------- >>>>>> DataStream JSONStream = env.addSource(new >>>>>> FlinkKafkaConsumer<>("data", new >>>>>> JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest()) >>>>>> >>>>>> DataStream<Point> myPoints = JSONStream.map(new jsonToPoint()); >>>>>> >>>>>> mypoints.keyBy("oID").filter(new findDistancefromPOI()); >>>>>> >>>>>> public class findDistancefromPOI extends RichFilterFunction<Point> { >>>>>> public boolean filter(Point input) throws Exception { >>>>>> Double distance = computeEuclideanDist( >>>>>> 16.4199 , 89.974 ,input.X(),input.Y); >>>>>> return distance > 0; >>>>>> } >>>>>> } >>>>>> >>>>>> Best Regards, >>>>>> Komal >>>>>> >>>>>