Hi Hiroyuki, I have run your tests many times. If I start the consumer first and ensure no backlogs for the subscription, then start the producer to publish messages, The result is always correct. For good viewing, I added a TreeSet to store "msg.getKey() + " -> " + consumer.getConsumerName()", for every 100 messages I prints the tree set. Following is the result:
``` 939 -> 79ac9 94 -> e5d9a 940 -> f6cfd 941 -> faf34 942 -> faf34 943 -> ccc34 943 -> f6cfd 944 -> ccc34 945 -> e3e44 946 -> e0e68 947 -> 79ac9 948 -> 1ca5d 949 -> ccc34 95 -> 1ca5d 950 -> 42635 951 -> f6cfd 952 -> e955a 953 -> e955a 954 -> 22e5a 955 -> 42635 956 -> 22e5a 957 -> e3e44 958 -> 26fdb 959 -> 16293 96 -> f69f0 960 -> e955a 961 -> bf9f5 962 -> f6cfd 963 -> ccc34 964 -> e3e44 965 -> 2970b 966 -> 16293 967 -> 16293 968 -> 26fdb 969 -> 1ca5d 97 -> 1ca5d 970 -> 53ec0 971 -> 22e5a 972 -> e0e68 973 -> f69f0 974 -> 79ac9 975 -> f6cfd 976 -> bf9f5 977 -> 2970b 977 -> 95de2 978 -> 4eecf 979 -> 22e5a 98 -> 1ca5d 980 -> f6cfd 981 -> 2970b 982 -> e955a 983 -> f6cfd 984 -> ccc34 985 -> bf9f5 986 -> ccc34 986 -> f6cfd 987 -> 1ca5d 988 -> ccc34 989 -> faf34 99 -> 79ac9 990 -> 16293 991 -> f6cfd 992 -> 4eecf 993 -> e3e44 994 -> e5d9a 995 -> bf9f5 996 -> 2970b 997 -> 22e5a 998 -> e955a 999 -> 16293 ===================================== ``` I just copy part of the log. If I start the producer first(subscription is already created), the problem occurs. I think this is what Sijie said. The consumer is created one by one, the hash range constantly being split. Hope this can help you. Thanks, Penghui Hiroyuki Yamada <[email protected]> 于2020年3月26日周四 上午9:58写道: > Hi, > > Thank you Sijie > > > > If there is a new consumer joining a subscription, the key > distribution will be split. So you are expected to see the messages of a > key is dispatched from one consumer to another consumer. If the consumers > of a subscription are stable, you will see the messages of the same key are > always dispatched to one consumer. > > Then, it is not working as expected. > I think the consumers of a subscription are stable since I just ran this . > > https://github.com/feeblefakie/misc/blob/master/pulsar/src/main/java/MyConsumer.java > > But the messages of the same key are not going the same consumer as > shown in the previous email. > Did I miss something ? > > Thanks, > Hiroyuki > > On Thu, Mar 26, 2020 at 9:39 AM Sijie Guo <[email protected]> wrote: > > > > Hi Hiroyuki, > > > > > First, multiple messages with the same key go to the same consumer > > thread as described below. > > > > If there is a new consumer joining a subscription, the key distribution > will be split. So you are expected to see the messages of a key is > dispatched from one consumer to another consumer. If the consumers of a > subscription are stable, you will see the messages of the same key are > always dispatched to one consumer. > > > > You can consider using a "sticky" consumer to consume messages. This > ensures messages of a given key range always go to that consumer. > > > > > Second, messages are almost equally balanced between consumer threads. > > > > The message distribution heavily relies on the key hashing. It is based > on Murmur3 hash. I think the skewness is coming from this part. > > One way to improve is to split the key range based on traffic instead of > splitting equally when a new consumer joins, similar to what we do recently > to bundle split. I have cc'ed Penghui in this email to see what is his > thoughts about this. > > > > - Sijie > > > > On Wed, Mar 25, 2020 at 3:43 AM Hiroyuki Yamada <[email protected]> > wrote: > >> > >> Hi, > >> > >> I'm checking Key_Shared feature and faced some unexpected behavior. > >> What happened roughly is, multiple messages with the same key go to > >> different consumers (consumer threads) even though Key_Shared is set. > >> Also, the distribution of messages to consumers is quite skewed. > >> > >> Let me explain what I did, what happened, and what was my expectation > >> in more detail. > >> > >> What I did: > >> 1. Created a partitioned topic named > >> "persistent://my-tenant/my-namespace/my-topic8" (# of partitions is 8) > >> 2. Ran a producer (the code is here. > >> > https://github.com/feeblefakie/misc/blob/master/pulsar/src/main/java/MyProducer.java > >> ) > >> - it just produces 10000 messages with SinglePartition. each message > >> has a key (from "0" to "999") > >> 3. Ran a consumer process (the code is here. > >> > https://github.com/feeblefakie/misc/blob/master/pulsar/src/main/java/MyConsumer.java > >> ) > >> - It consumes messages with Key_Shared and 20 consumer threads set > >> by ClientBuilder.listenerThreads(20). > >> > >> What happened: > >> Multiple messages with the same key go to different consumer threads. > >> For example, the following is a set of messages with key "100" but > >> some goes to b4fa8 others go to 96ee9. > >> > >> #ConsumerName key > >> b4fa8 100 > >> b4fa8 100 > >> b4fa8 100 > >> b4fa8 100 > >> 96ee9 100 > >> 96ee9 100 > >> 96ee9 100 > >> 96ee9 100 > >> 96ee9 100 > >> 96ee9 100 > >> > >> Also, distribution of messages are pretty skewed. > >> > >> # numOfMessages consumerName > >> 289 15588 > >> 346 21dcb > >> 138 284ca > >> 379 29138 > >> 195 30276 > >> 533 500ae > >> 375 5c4cb > >> 338 654bf > >> 194 699fe > >> 271 6c6c8 > >> 220 73691 > >> 152 7f5d1 > >> 372 96ee9 > >> 359 9b55d > >> 315 a22fd > >> 196 a752a > >> 4243 b4fa8 <- this consumes most of the messages > >> 387 cc11a > >> 307 d0f5d > >> 391 ec8b8 > >> > >> What I expected: > >> > >> First, multiple messages with the same key go to the same consumer > >> thread as described in below. > >> http://pulsar.apache.org/docs/en/concepts-messaging/#key_shared > >> Second, messages are almost equally balanced between consumer threads. > >> > >> So, am I doing something wrong ? or my expectation is wrong ? > >> If my expectation is correct, It would be great if anyone can teach me > >> how to make it happen. > >> > >> Best regards, > >> Hiroyuki >
