Hello PengHui, Thank you for the reply and the investigation. Ah, ok, I didn't know it triggers the range split even if consumer threads are created for a short period of time. Sorry, I was mis-understanding the behavior.
Thank you again. It helps a lot. Best, Hiroyuki On Fri, Mar 27, 2020 at 12:57 AM PengHui Li <[email protected]> wrote: > > Hi Hiroyuki, > > I have run your tests many times. If I start the consumer first and ensure no > backlogs for the subscription, > then start the producer to publish messages, The result is always correct. > For good viewing, I added a TreeSet to store > "msg.getKey() + " -> " + consumer.getConsumerName()", for every 100 messages > I prints the tree set. Following is the result: > > ``` > 939 -> 79ac9 > 94 -> e5d9a > 940 -> f6cfd > 941 -> faf34 > 942 -> faf34 > 943 -> ccc34 > 943 -> f6cfd > 944 -> ccc34 > 945 -> e3e44 > 946 -> e0e68 > 947 -> 79ac9 > 948 -> 1ca5d > 949 -> ccc34 > 95 -> 1ca5d > 950 -> 42635 > 951 -> f6cfd > 952 -> e955a > 953 -> e955a > 954 -> 22e5a > 955 -> 42635 > 956 -> 22e5a > 957 -> e3e44 > 958 -> 26fdb > 959 -> 16293 > 96 -> f69f0 > 960 -> e955a > 961 -> bf9f5 > 962 -> f6cfd > 963 -> ccc34 > 964 -> e3e44 > 965 -> 2970b > 966 -> 16293 > 967 -> 16293 > 968 -> 26fdb > 969 -> 1ca5d > 97 -> 1ca5d > 970 -> 53ec0 > 971 -> 22e5a > 972 -> e0e68 > 973 -> f69f0 > 974 -> 79ac9 > 975 -> f6cfd > 976 -> bf9f5 > 977 -> 2970b > 977 -> 95de2 > 978 -> 4eecf > 979 -> 22e5a > 98 -> 1ca5d > 980 -> f6cfd > 981 -> 2970b > 982 -> e955a > 983 -> f6cfd > 984 -> ccc34 > 985 -> bf9f5 > 986 -> ccc34 > 986 -> f6cfd > 987 -> 1ca5d > 988 -> ccc34 > 989 -> faf34 > 99 -> 79ac9 > 990 -> 16293 > 991 -> f6cfd > 992 -> 4eecf > 993 -> e3e44 > 994 -> e5d9a > 995 -> bf9f5 > 996 -> 2970b > 997 -> 22e5a > 998 -> e955a > 999 -> 16293 > ===================================== > ``` > > I just copy part of the log. > > If I start the producer first(subscription is already created), the problem > occurs. > I think this is what Sijie said. The consumer is created one by one, the hash > range > constantly being split. > > Hope this can help you. > > Thanks, > Penghui > > > Hiroyuki Yamada <[email protected]> 于2020年3月26日周四 上午9:58写道: >> >> Hi, >> >> Thank you Sijie >> >> > > If there is a new consumer joining a subscription, the key distribution >> > > will be split. So you are expected to see the messages of a key is >> > > dispatched from one consumer to another consumer. If the consumers of a >> > > subscription are stable, you will see the messages of the same key are >> > > always dispatched to one consumer. >> >> Then, it is not working as expected. >> I think the consumers of a subscription are stable since I just ran this . >> https://github.com/feeblefakie/misc/blob/master/pulsar/src/main/java/MyConsumer.java >> >> But the messages of the same key are not going the same consumer as >> shown in the previous email. >> Did I miss something ? >> >> Thanks, >> Hiroyuki >> >> On Thu, Mar 26, 2020 at 9:39 AM Sijie Guo <[email protected]> wrote: >> > >> > Hi Hiroyuki, >> > >> > > First, multiple messages with the same key go to the same consumer >> > thread as described below. >> > >> > If there is a new consumer joining a subscription, the key distribution >> > will be split. So you are expected to see the messages of a key is >> > dispatched from one consumer to another consumer. If the consumers of a >> > subscription are stable, you will see the messages of the same key are >> > always dispatched to one consumer. >> > >> > You can consider using a "sticky" consumer to consume messages. This >> > ensures messages of a given key range always go to that consumer. >> > >> > > Second, messages are almost equally balanced between consumer threads. >> > >> > The message distribution heavily relies on the key hashing. It is based on >> > Murmur3 hash. I think the skewness is coming from this part. >> > One way to improve is to split the key range based on traffic instead of >> > splitting equally when a new consumer joins, similar to what we do >> > recently to bundle split. I have cc'ed Penghui in this email to see what >> > is his thoughts about this. >> > >> > - Sijie >> > >> > On Wed, Mar 25, 2020 at 3:43 AM Hiroyuki Yamada <[email protected]> wrote: >> >> >> >> Hi, >> >> >> >> I'm checking Key_Shared feature and faced some unexpected behavior. >> >> What happened roughly is, multiple messages with the same key go to >> >> different consumers (consumer threads) even though Key_Shared is set. >> >> Also, the distribution of messages to consumers is quite skewed. >> >> >> >> Let me explain what I did, what happened, and what was my expectation >> >> in more detail. >> >> >> >> What I did: >> >> 1. Created a partitioned topic named >> >> "persistent://my-tenant/my-namespace/my-topic8" (# of partitions is 8) >> >> 2. Ran a producer (the code is here. >> >> https://github.com/feeblefakie/misc/blob/master/pulsar/src/main/java/MyProducer.java >> >> ) >> >> - it just produces 10000 messages with SinglePartition. each message >> >> has a key (from "0" to "999") >> >> 3. Ran a consumer process (the code is here. >> >> https://github.com/feeblefakie/misc/blob/master/pulsar/src/main/java/MyConsumer.java >> >> ) >> >> - It consumes messages with Key_Shared and 20 consumer threads set >> >> by ClientBuilder.listenerThreads(20). >> >> >> >> What happened: >> >> Multiple messages with the same key go to different consumer threads. >> >> For example, the following is a set of messages with key "100" but >> >> some goes to b4fa8 others go to 96ee9. >> >> >> >> #ConsumerName key >> >> b4fa8 100 >> >> b4fa8 100 >> >> b4fa8 100 >> >> b4fa8 100 >> >> 96ee9 100 >> >> 96ee9 100 >> >> 96ee9 100 >> >> 96ee9 100 >> >> 96ee9 100 >> >> 96ee9 100 >> >> >> >> Also, distribution of messages are pretty skewed. >> >> >> >> # numOfMessages consumerName >> >> 289 15588 >> >> 346 21dcb >> >> 138 284ca >> >> 379 29138 >> >> 195 30276 >> >> 533 500ae >> >> 375 5c4cb >> >> 338 654bf >> >> 194 699fe >> >> 271 6c6c8 >> >> 220 73691 >> >> 152 7f5d1 >> >> 372 96ee9 >> >> 359 9b55d >> >> 315 a22fd >> >> 196 a752a >> >> 4243 b4fa8 <- this consumes most of the messages >> >> 387 cc11a >> >> 307 d0f5d >> >> 391 ec8b8 >> >> >> >> What I expected: >> >> >> >> First, multiple messages with the same key go to the same consumer >> >> thread as described in below. >> >> http://pulsar.apache.org/docs/en/concepts-messaging/#key_shared >> >> Second, messages are almost equally balanced between consumer threads. >> >> >> >> So, am I doing something wrong ? or my expectation is wrong ? >> >> If my expectation is correct, It would be great if anyone can teach me >> >> how to make it happen. >> >> >> >> Best regards, >> >> Hiroyuki
