How many partitions do you have for the topic? Can you issue --describe
please and share, thanks

בתאריך יום ה׳, 1 ביולי 2021, 13:31, מאת Marcus Schäfer ‏<
marcus.schae...@gmail.com>:

> Hi,
>
> > Your first understanding is correct, provided each “consumer” means a
> > “consumer thread”
>
> all right, thanks
>
> > IMO, Second understanding about message distribution is incorrect because
> > there is something called as max poll records for each consumer. Its 500
> by
> > default. And the time between 2 polls is also very small in few
> milliseconds.
> > Thats why this is happening.
> >
> > You may need to try this on a big number of messages so that other
> > partitions get assigned.
>
> Ah ok, so you are saying the partition asignment depends on the load
> of the topic ? This was new to me as I thought kafka distributes the
> messages between the active consumers independent of the amount of
> data. Is there documentation available that explains how this
> correlates ?
>
> If one poll() fetches them all it's clear to me that there is not
> much left to distribute and if the subsequent poll() happens right
> after the former one, I can imagine it stays at the same partition.
> Thanks for pointing this out.
>
> So I changed the code and used:
>
>     max_poll_records=1
>
> That actually means one poll() for each message. I added a
> sleep time between the polls of 1sec and started two consumers(read.py)
> with some delay such that there are a number of poll()'s from
> different consumers with different timing.
>
> From this situation I tested producing messages:
>
>    20
>    200
>    2000
>    200000
>
> There was no case in which messages got distributed between the
> two consumers. It was always one receiving them. If you close
> one of the consumers the other continued receiving the messages.
>
> I must admit I did not wait forever (1sec between polls is long :-))
> but I also think it wouldn't have changed while processing.
>
> > I tried my best to participate in discussion I am not expert though😊
>
> Thanks much for looking into this. I still think something is
> wrong in either my testing/code or on the cluster. Maybe some
> details on the kafka setup helps:
>
> This is the server setup:
>
>     auto.create.topics.enable=false
>     default.replication.factor=3
>     min.insync.replicas=2
>     num.io.threads=8
>     num.network.threads=5
>     num.partitions=20
>     num.replica.fetchers=2
>     replica.lag.time.max.ms=30000
>     socket.receive.buffer.bytes=102400
>     socket.request.max.bytes=104857600
>     socket.send.buffer.bytes=102400
>     unclean.leader.election.enable=true
>     zookeeper.session.timeout.ms=18000
>
> This is the kafka version
>
> "CurrentBrokerSoftwareInfo": {
>     "KafkaVersion": "2.6.1"
> }
>
>
> Do you see any setting that impacts the dynamic partition assignment ?
>
> I was reading about the api_version exchange and that it can
> impact the availability of features:
>
>     (0, 9) enables full group coordination features with automatic
>            partition assignment and rebalancing,
>     (0, 8, 2) enables kafka-storage offset commits with manual
>            partition assignment only,
>     (0, 8, 1) enables zookeeper-storage offset commits with manual
>            partition assignment only,
>     (0, 8, 0) enables basic functionality but requires manual
>            partition assignment and offset management.
>
>
> I was not able to find out which api_version the server as setup
> by Amazon MSK is talking, though
>
> Thanks
>
> Regards,
> Marcus
> --
>  Public Key available via: https://keybase.io/marcus_schaefer/key.asc
>  keybase search marcus_schaefer
>  -------------------------------------------------------
>  Marcus Schäfer                 Am Unterösch 9
>  Tel: +49 7562 905437           D-88316 Isny / Rohrdorf
>  Germany
>  -------------------------------------------------------
>

Reply via email to