Hello PengHui,

Thank you for the reply and the investigation.
Ah, ok, I didn't know it triggers the range split even if consumer
threads are created for a short period of time.
Sorry, I was mis-understanding the behavior.

Thank you again. It helps a lot.

Best,
Hiroyuki

On Fri, Mar 27, 2020 at 12:57 AM PengHui Li <[email protected]> wrote:
>
> Hi Hiroyuki,
>
> I have run your tests many times. If I start the consumer first and ensure no 
> backlogs for the subscription,
> then start the producer to publish messages, The result is always correct. 
> For good viewing, I added a TreeSet to store
> "msg.getKey() + " -> " + consumer.getConsumerName()", for every 100 messages 
> I prints the tree set. Following is the result:
>
> ```
> 939 -> 79ac9
> 94 -> e5d9a
> 940 -> f6cfd
> 941 -> faf34
> 942 -> faf34
> 943 -> ccc34
> 943 -> f6cfd
> 944 -> ccc34
> 945 -> e3e44
> 946 -> e0e68
> 947 -> 79ac9
> 948 -> 1ca5d
> 949 -> ccc34
> 95 -> 1ca5d
> 950 -> 42635
> 951 -> f6cfd
> 952 -> e955a
> 953 -> e955a
> 954 -> 22e5a
> 955 -> 42635
> 956 -> 22e5a
> 957 -> e3e44
> 958 -> 26fdb
> 959 -> 16293
> 96 -> f69f0
> 960 -> e955a
> 961 -> bf9f5
> 962 -> f6cfd
> 963 -> ccc34
> 964 -> e3e44
> 965 -> 2970b
> 966 -> 16293
> 967 -> 16293
> 968 -> 26fdb
> 969 -> 1ca5d
> 97 -> 1ca5d
> 970 -> 53ec0
> 971 -> 22e5a
> 972 -> e0e68
> 973 -> f69f0
> 974 -> 79ac9
> 975 -> f6cfd
> 976 -> bf9f5
> 977 -> 2970b
> 977 -> 95de2
> 978 -> 4eecf
> 979 -> 22e5a
> 98 -> 1ca5d
> 980 -> f6cfd
> 981 -> 2970b
> 982 -> e955a
> 983 -> f6cfd
> 984 -> ccc34
> 985 -> bf9f5
> 986 -> ccc34
> 986 -> f6cfd
> 987 -> 1ca5d
> 988 -> ccc34
> 989 -> faf34
> 99 -> 79ac9
> 990 -> 16293
> 991 -> f6cfd
> 992 -> 4eecf
> 993 -> e3e44
> 994 -> e5d9a
> 995 -> bf9f5
> 996 -> 2970b
> 997 -> 22e5a
> 998 -> e955a
> 999 -> 16293
> =====================================
> ```
>
> I just copy part of the log.
>
> If I start the producer first(subscription is already created), the problem 
> occurs.
> I think this is what Sijie said. The consumer is created one by one, the hash 
> range
> constantly being split.
>
> Hope this can help you.
>
> Thanks,
> Penghui
>
>
> Hiroyuki Yamada <[email protected]> 于2020年3月26日周四 上午9:58写道:
>>
>> Hi,
>>
>> Thank you Sijie
>>
>> > > If there is a new consumer joining a subscription, the key distribution 
>> > > will be split. So you are expected to see the messages of a key is 
>> > > dispatched from one consumer to another consumer. If the consumers of a 
>> > > subscription are stable, you will see the messages of the same key are 
>> > > always dispatched to one consumer.
>>
>> Then, it is not working as expected.
>> I think the consumers of a subscription are stable since I just ran this .
>> https://github.com/feeblefakie/misc/blob/master/pulsar/src/main/java/MyConsumer.java
>>
>> But the messages of the same key are not going the same consumer as
>> shown in the previous email.
>> Did I miss something  ?
>>
>> Thanks,
>> Hiroyuki
>>
>> On Thu, Mar 26, 2020 at 9:39 AM Sijie Guo <[email protected]> wrote:
>> >
>> > Hi Hiroyuki,
>> >
>> > > First, multiple messages with the same key go to the same consumer
>> > thread as described below.
>> >
>> > If there is a new consumer joining a subscription, the key distribution 
>> > will be split. So you are expected to see the messages of a key is 
>> > dispatched from one consumer to another consumer. If the consumers of a 
>> > subscription are stable, you will see the messages of the same key are 
>> > always dispatched to one consumer.
>> >
>> > You can consider using a "sticky" consumer to consume messages. This 
>> > ensures messages of a given key range always go to that consumer.
>> >
>> > > Second, messages are almost equally balanced between consumer threads.
>> >
>> > The message distribution heavily relies on the key hashing. It is based on 
>> > Murmur3 hash. I think the skewness is coming from this part.
>> > One way to improve is to split the key range based on traffic instead of 
>> > splitting equally when a new consumer joins, similar to what we do 
>> > recently to bundle split. I have cc'ed Penghui in this email to see what 
>> > is his thoughts about this.
>> >
>> > - Sijie
>> >
>> > On Wed, Mar 25, 2020 at 3:43 AM Hiroyuki Yamada <[email protected]> wrote:
>> >>
>> >> Hi,
>> >>
>> >> I'm checking Key_Shared feature and faced some unexpected behavior.
>> >> What happened roughly is, multiple messages with the same key go to
>> >> different consumers (consumer threads) even though Key_Shared is set.
>> >> Also, the distribution of messages to consumers is quite skewed.
>> >>
>> >> Let me explain what I did, what happened, and what was my expectation
>> >> in more detail.
>> >>
>> >> What I did:
>> >> 1. Created a partitioned topic named
>> >> "persistent://my-tenant/my-namespace/my-topic8" (# of partitions is 8)
>> >> 2. Ran a producer (the code is here.
>> >> https://github.com/feeblefakie/misc/blob/master/pulsar/src/main/java/MyProducer.java
>> >> )
>> >>   - it just produces 10000 messages with SinglePartition. each message
>> >> has a key (from "0" to "999")
>> >> 3. Ran a consumer process (the code is here.
>> >> https://github.com/feeblefakie/misc/blob/master/pulsar/src/main/java/MyConsumer.java
>> >> )
>> >>   - It consumes messages with Key_Shared and 20 consumer threads set
>> >> by ClientBuilder.listenerThreads(20).
>> >>
>> >> What happened:
>> >> Multiple messages with the same key go to different consumer threads.
>> >> For example, the following is a set of messages with key "100" but
>> >> some goes to b4fa8 others go to 96ee9.
>> >>
>> >> #ConsumerName key
>> >> b4fa8 100
>> >> b4fa8 100
>> >> b4fa8 100
>> >> b4fa8 100
>> >> 96ee9 100
>> >> 96ee9 100
>> >> 96ee9 100
>> >> 96ee9 100
>> >> 96ee9 100
>> >> 96ee9 100
>> >>
>> >> Also, distribution of messages are pretty skewed.
>> >>
>> >> # numOfMessages consumerName
>> >>  289 15588
>> >>  346 21dcb
>> >>  138 284ca
>> >>  379 29138
>> >>  195 30276
>> >>  533 500ae
>> >>  375 5c4cb
>> >>  338 654bf
>> >>  194 699fe
>> >>  271 6c6c8
>> >>  220 73691
>> >>  152 7f5d1
>> >>  372 96ee9
>> >>  359 9b55d
>> >>  315 a22fd
>> >>  196 a752a
>> >> 4243 b4fa8 <- this consumes most of the messages
>> >>  387 cc11a
>> >>  307 d0f5d
>> >>  391 ec8b8
>> >>
>> >> What I expected:
>> >>
>> >> First, multiple messages with the same key go to the same consumer
>> >> thread as described in below.
>> >> http://pulsar.apache.org/docs/en/concepts-messaging/#key_shared
>> >> Second, messages are almost equally balanced between consumer threads.
>> >>
>> >> So, am I doing something wrong ? or my expectation is wrong ?
>> >> If my expectation is correct, It would be great if anyone can teach me
>> >> how to make it happen.
>> >>
>> >> Best regards,
>> >> Hiroyuki

Reply via email to