How many partitions do you have for the topic? Can you issue --describe please and share, thanks
בתאריך יום ה׳, 1 ביולי 2021, 13:31, מאת Marcus Schäfer < marcus.schae...@gmail.com>: > Hi, > > > Your first understanding is correct, provided each “consumer” means a > > “consumer thread” > > all right, thanks > > > IMO, Second understanding about message distribution is incorrect because > > there is something called as max poll records for each consumer. Its 500 > by > > default. And the time between 2 polls is also very small in few > milliseconds. > > Thats why this is happening. > > > > You may need to try this on a big number of messages so that other > > partitions get assigned. > > Ah ok, so you are saying the partition asignment depends on the load > of the topic ? This was new to me as I thought kafka distributes the > messages between the active consumers independent of the amount of > data. Is there documentation available that explains how this > correlates ? > > If one poll() fetches them all it's clear to me that there is not > much left to distribute and if the subsequent poll() happens right > after the former one, I can imagine it stays at the same partition. > Thanks for pointing this out. > > So I changed the code and used: > > max_poll_records=1 > > That actually means one poll() for each message. I added a > sleep time between the polls of 1sec and started two consumers(read.py) > with some delay such that there are a number of poll()'s from > different consumers with different timing. > > From this situation I tested producing messages: > > 20 > 200 > 2000 > 200000 > > There was no case in which messages got distributed between the > two consumers. It was always one receiving them. If you close > one of the consumers the other continued receiving the messages. > > I must admit I did not wait forever (1sec between polls is long :-)) > but I also think it wouldn't have changed while processing. > > > I tried my best to participate in discussion I am not expert though😊 > > Thanks much for looking into this. I still think something is > wrong in either my testing/code or on the cluster. Maybe some > details on the kafka setup helps: > > This is the server setup: > > auto.create.topics.enable=false > default.replication.factor=3 > min.insync.replicas=2 > num.io.threads=8 > num.network.threads=5 > num.partitions=20 > num.replica.fetchers=2 > replica.lag.time.max.ms=30000 > socket.receive.buffer.bytes=102400 > socket.request.max.bytes=104857600 > socket.send.buffer.bytes=102400 > unclean.leader.election.enable=true > zookeeper.session.timeout.ms=18000 > > This is the kafka version > > "CurrentBrokerSoftwareInfo": { > "KafkaVersion": "2.6.1" > } > > > Do you see any setting that impacts the dynamic partition assignment ? > > I was reading about the api_version exchange and that it can > impact the availability of features: > > (0, 9) enables full group coordination features with automatic > partition assignment and rebalancing, > (0, 8, 2) enables kafka-storage offset commits with manual > partition assignment only, > (0, 8, 1) enables zookeeper-storage offset commits with manual > partition assignment only, > (0, 8, 0) enables basic functionality but requires manual > partition assignment and offset management. > > > I was not able to find out which api_version the server as setup > by Amazon MSK is talking, though > > Thanks > > Regards, > Marcus > -- > Public Key available via: https://keybase.io/marcus_schaefer/key.asc > keybase search marcus_schaefer > ------------------------------------------------------- > Marcus Schäfer Am Unterösch 9 > Tel: +49 7562 905437 D-88316 Isny / Rohrdorf > Germany > ------------------------------------------------------- >