Hi Hiroyuki,

I have run your tests many times. If I start the consumer first and ensure
no backlogs for the subscription,
then start the producer to publish messages, The result is always
correct. For good viewing, I added a TreeSet to store
"msg.getKey() + " -> " + consumer.getConsumerName()", for every 100
messages I prints the tree set. Following is the result:

```
939 -> 79ac9
94 -> e5d9a
940 -> f6cfd
941 -> faf34
942 -> faf34
943 -> ccc34
943 -> f6cfd
944 -> ccc34
945 -> e3e44
946 -> e0e68
947 -> 79ac9
948 -> 1ca5d
949 -> ccc34
95 -> 1ca5d
950 -> 42635
951 -> f6cfd
952 -> e955a
953 -> e955a
954 -> 22e5a
955 -> 42635
956 -> 22e5a
957 -> e3e44
958 -> 26fdb
959 -> 16293
96 -> f69f0
960 -> e955a
961 -> bf9f5
962 -> f6cfd
963 -> ccc34
964 -> e3e44
965 -> 2970b
966 -> 16293
967 -> 16293
968 -> 26fdb
969 -> 1ca5d
97 -> 1ca5d
970 -> 53ec0
971 -> 22e5a
972 -> e0e68
973 -> f69f0
974 -> 79ac9
975 -> f6cfd
976 -> bf9f5
977 -> 2970b
977 -> 95de2
978 -> 4eecf
979 -> 22e5a
98 -> 1ca5d
980 -> f6cfd
981 -> 2970b
982 -> e955a
983 -> f6cfd
984 -> ccc34
985 -> bf9f5
986 -> ccc34
986 -> f6cfd
987 -> 1ca5d
988 -> ccc34
989 -> faf34
99 -> 79ac9
990 -> 16293
991 -> f6cfd
992 -> 4eecf
993 -> e3e44
994 -> e5d9a
995 -> bf9f5
996 -> 2970b
997 -> 22e5a
998 -> e955a
999 -> 16293
=====================================
```

I just copy part of the log.

If I start the producer first(subscription is already created), the problem
occurs.
I think this is what Sijie said. The consumer is created one by one, the
hash range
constantly being split.

Hope this can help you.

Thanks,
Penghui


Hiroyuki Yamada <[email protected]> 于2020年3月26日周四 上午9:58写道:

> Hi,
>
> Thank you Sijie
>
> > > If there is a new consumer joining a subscription, the key
> distribution will be split. So you are expected to see the messages of a
> key is dispatched from one consumer to another consumer. If the consumers
> of a subscription are stable, you will see the messages of the same key are
> always dispatched to one consumer.
>
> Then, it is not working as expected.
> I think the consumers of a subscription are stable since I just ran this .
>
> https://github.com/feeblefakie/misc/blob/master/pulsar/src/main/java/MyConsumer.java
>
> But the messages of the same key are not going the same consumer as
> shown in the previous email.
> Did I miss something  ?
>
> Thanks,
> Hiroyuki
>
> On Thu, Mar 26, 2020 at 9:39 AM Sijie Guo <[email protected]> wrote:
> >
> > Hi Hiroyuki,
> >
> > > First, multiple messages with the same key go to the same consumer
> > thread as described below.
> >
> > If there is a new consumer joining a subscription, the key distribution
> will be split. So you are expected to see the messages of a key is
> dispatched from one consumer to another consumer. If the consumers of a
> subscription are stable, you will see the messages of the same key are
> always dispatched to one consumer.
> >
> > You can consider using a "sticky" consumer to consume messages. This
> ensures messages of a given key range always go to that consumer.
> >
> > > Second, messages are almost equally balanced between consumer threads.
> >
> > The message distribution heavily relies on the key hashing. It is based
> on Murmur3 hash. I think the skewness is coming from this part.
> > One way to improve is to split the key range based on traffic instead of
> splitting equally when a new consumer joins, similar to what we do recently
> to bundle split. I have cc'ed Penghui in this email to see what is his
> thoughts about this.
> >
> > - Sijie
> >
> > On Wed, Mar 25, 2020 at 3:43 AM Hiroyuki Yamada <[email protected]>
> wrote:
> >>
> >> Hi,
> >>
> >> I'm checking Key_Shared feature and faced some unexpected behavior.
> >> What happened roughly is, multiple messages with the same key go to
> >> different consumers (consumer threads) even though Key_Shared is set.
> >> Also, the distribution of messages to consumers is quite skewed.
> >>
> >> Let me explain what I did, what happened, and what was my expectation
> >> in more detail.
> >>
> >> What I did:
> >> 1. Created a partitioned topic named
> >> "persistent://my-tenant/my-namespace/my-topic8" (# of partitions is 8)
> >> 2. Ran a producer (the code is here.
> >>
> https://github.com/feeblefakie/misc/blob/master/pulsar/src/main/java/MyProducer.java
> >> )
> >>   - it just produces 10000 messages with SinglePartition. each message
> >> has a key (from "0" to "999")
> >> 3. Ran a consumer process (the code is here.
> >>
> https://github.com/feeblefakie/misc/blob/master/pulsar/src/main/java/MyConsumer.java
> >> )
> >>   - It consumes messages with Key_Shared and 20 consumer threads set
> >> by ClientBuilder.listenerThreads(20).
> >>
> >> What happened:
> >> Multiple messages with the same key go to different consumer threads.
> >> For example, the following is a set of messages with key "100" but
> >> some goes to b4fa8 others go to 96ee9.
> >>
> >> #ConsumerName key
> >> b4fa8 100
> >> b4fa8 100
> >> b4fa8 100
> >> b4fa8 100
> >> 96ee9 100
> >> 96ee9 100
> >> 96ee9 100
> >> 96ee9 100
> >> 96ee9 100
> >> 96ee9 100
> >>
> >> Also, distribution of messages are pretty skewed.
> >>
> >> # numOfMessages consumerName
> >>  289 15588
> >>  346 21dcb
> >>  138 284ca
> >>  379 29138
> >>  195 30276
> >>  533 500ae
> >>  375 5c4cb
> >>  338 654bf
> >>  194 699fe
> >>  271 6c6c8
> >>  220 73691
> >>  152 7f5d1
> >>  372 96ee9
> >>  359 9b55d
> >>  315 a22fd
> >>  196 a752a
> >> 4243 b4fa8 <- this consumes most of the messages
> >>  387 cc11a
> >>  307 d0f5d
> >>  391 ec8b8
> >>
> >> What I expected:
> >>
> >> First, multiple messages with the same key go to the same consumer
> >> thread as described in below.
> >> http://pulsar.apache.org/docs/en/concepts-messaging/#key_shared
> >> Second, messages are almost equally balanced between consumer threads.
> >>
> >> So, am I doing something wrong ? or my expectation is wrong ?
> >> If my expectation is correct, It would be great if anyone can teach me
> >> how to make it happen.
> >>
> >> Best regards,
> >> Hiroyuki
>

Reply via email to