Hi Hiroyuki, > First, multiple messages with the same key go to the same consumer thread as described below.
If there is a new consumer joining a subscription, the key distribution will be split. So you are expected to see the messages of a key is dispatched from one consumer to another consumer. If the consumers of a subscription are stable, you will see the messages of the same key are always dispatched to one consumer. You can consider using a "sticky" consumer to consume messages. This ensures messages of a given key range always go to that consumer. > Second, messages are almost equally balanced between consumer threads. The message distribution heavily relies on the key hashing. It is based on Murmur3 hash. I think the skewness is coming from this part. One way to improve is to split the key range based on traffic instead of splitting equally when a new consumer joins, similar to what we do recently to bundle split. I have cc'ed Penghui in this email to see what is his thoughts about this. - Sijie On Wed, Mar 25, 2020 at 3:43 AM Hiroyuki Yamada <[email protected]> wrote: > Hi, > > I'm checking Key_Shared feature and faced some unexpected behavior. > What happened roughly is, multiple messages with the same key go to > different consumers (consumer threads) even though Key_Shared is set. > Also, the distribution of messages to consumers is quite skewed. > > Let me explain what I did, what happened, and what was my expectation > in more detail. > > What I did: > 1. Created a partitioned topic named > "persistent://my-tenant/my-namespace/my-topic8" (# of partitions is 8) > 2. Ran a producer (the code is here. > > https://github.com/feeblefakie/misc/blob/master/pulsar/src/main/java/MyProducer.java > ) > - it just produces 10000 messages with SinglePartition. each message > has a key (from "0" to "999") > 3. Ran a consumer process (the code is here. > > https://github.com/feeblefakie/misc/blob/master/pulsar/src/main/java/MyConsumer.java > ) > - It consumes messages with Key_Shared and 20 consumer threads set > by ClientBuilder.listenerThreads(20). > > What happened: > Multiple messages with the same key go to different consumer threads. > For example, the following is a set of messages with key "100" but > some goes to b4fa8 others go to 96ee9. > > #ConsumerName key > b4fa8 100 > b4fa8 100 > b4fa8 100 > b4fa8 100 > 96ee9 100 > 96ee9 100 > 96ee9 100 > 96ee9 100 > 96ee9 100 > 96ee9 100 > > Also, distribution of messages are pretty skewed. > > # numOfMessages consumerName > 289 15588 > 346 21dcb > 138 284ca > 379 29138 > 195 30276 > 533 500ae > 375 5c4cb > 338 654bf > 194 699fe > 271 6c6c8 > 220 73691 > 152 7f5d1 > 372 96ee9 > 359 9b55d > 315 a22fd > 196 a752a > 4243 b4fa8 <- this consumes most of the messages > 387 cc11a > 307 d0f5d > 391 ec8b8 > > What I expected: > > First, multiple messages with the same key go to the same consumer > thread as described in below. > http://pulsar.apache.org/docs/en/concepts-messaging/#key_shared > Second, messages are almost equally balanced between consumer threads. > > So, am I doing something wrong ? or my expectation is wrong ? > If my expectation is correct, It would be great if anyone can teach me > how to make it happen. > > Best regards, > Hiroyuki >
