Hi,
My apologies - a newbie here.
In looking at the getPartition() method, I notice the returned value for keyed
message is the "index" of a partition. But when there is no key, then the
value returned is the "id" of a partition.
case None =>
val availablePartitions =
topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
if (availablePartitions.isEmpty)
throw new LeaderNotAvailableException("No leader for any
partition in topic " + topic)
val index = Utils.abs(Random.nextInt) % availablePartitions.size
val partitionId = availablePartitions(index).partitionId
sendPartitionPerTopicCache.put(topic, partitionId)
<< partitionId >>
It probably won't make a difference when there is no gaps in the sequence of
partitions. However, when there is a gap - e.g. some partitions are not
available - then the partition id and index would be different.
In the partitionAndCollate() method, the semantic seems to be using index
instead of id.
Thanks