Hi! I'm just getting back to this. Questions: 1. Across operators, does the same key group ids get mapped to the same task managers? E.g. if an item is in key group 1 of operator A and that runs on taskmanager-0, will key group 1 of operator B also run on taskmanager-0? 2. Are there any internal optimizations when shuffling to perform a forward when the key group of the output row will hash to the same machine?
On Thu, Jul 29, 2021 at 4:49 AM Arvid Heise <ar...@apache.org> wrote: > Afaik you can express the partition key in Table API now which will be > used for co-location and optimization. So I'd probably give that a try > first and convert the Table to DataStream where needed. > > On Sat, Jul 24, 2021 at 9:22 PM Dan Hill <quietgol...@gmail.com> wrote: > >> Thanks Fabian and Senhong! >> >> Here's an example diagram of the join that I want to do. There are more >> layers of joins. >> >> https://docs.google.com/presentation/d/17vYTBUIgrdxuYyEYXrSHypFhwwS7NdbyhVgioYMxPWc/edit#slide=id.p >> >> 1) Thanks! I'll look into these. >> >> 2) I'm using the same key across multiple Kafka topics. I can change the >> producers and consumers to write to whatever partitions that would help. >> >> The job is pretty simple right now. No optimizations. We're currently >> running this on one task manager. The goal of the email was to start >> thinking about optimizations. If the usual practice is to let Flink >> regroup the kafka sources on input, how do teams deal with the serde >> overhead of this? Just factor it into overhead and allocate more resources? >> >> >> >> >> On Fri, Jul 23, 2021 at 3:21 AM Senhong Liu <senhong...@gmail.com> wrote: >> >>> Hi Dan, >>> >>> 1) If the key doesn’t change in the downstream operators and you want to >>> avoid shuffling, maybe the DataStreamUtils#reinterpretAsKeyedStream would >>> be helpful. >>> >>> 2) I am not sure that if you are saying that the data are already >>> partitioned in the Kafka and you want to avoid shuffling in the Flink >>> because of reusing keyBy(). One solution is that you can try to partition >>> your data in the Kafka as if it was partitioned in the Flink when using >>> keyBy(). After that, feel free to >>> use DataStreamUtils#reinterpretAsKeyedStream! >>> >>> If your use case is not what I described above, maybe you can provide us >>> more information. >>> >>> Best, >>> Senhong >>> >>> Sent with a Spark <https://sparkmailapp.com/source?from=signature> >>> On Jul 22, 2021, 7:33 AM +0800, Dan Hill <quietgol...@gmail.com>, wrote: >>> >>> Hi. >>> >>> 1) If I use the same key in downstream operators (my key is a user id), >>> will the rows stay on the same TaskManager machine? I join in more info >>> based on the user id as the key. I'd like for these to stay on the same >>> machine rather than shuffle a bunch of user-specific info to multiple task >>> manager machines. >>> >>> 2) What are best practices to reduce the number of shuffles when having >>> multiple kafka topics with similar keys (user id). E.g. should I make make >>> sure the same key writes to the same partition number and then manually >>> which flink tasks get which kafka partitions? >>> >>>