Was reported by one of the users, so I looked at the code as that was the only thing I could go by. Trying to get more details
Sent from my iPhone > On Apr 2, 2016, at 01:07, Ewen Cheslack-Postava <e...@confluent.io> wrote: > > Oleg, > > Normally the number of partitions doesn't change (or infrequently, at > least) so regardless of how you got the number of partitions there > shouldn't be an inconsistency. Are you actually seeing an inconsistency > causing this exception? And is the number of partitions not changing? Is it > possible a simple off-by-one error is the issue, where you're specifying > `numPartitions` instead of a value in [0, `numPartitions`-1]? > > -Ewen > > On Mon, Mar 28, 2016 at 2:20 PM, Oleg Zhurakousky < > ozhurakou...@hortonworks.com> wrote: > >> Hi >> >> It seems there are several ways to get to the same number in Kafka API. >> In Kafka Partitioner which is invoked by KafkaProducer we have this >> >> public int partition(ProducerRecord<byte[], byte[]> record, Cluster >> cluster) { >> List<PartitionInfo> partitions = >> cluster.partitionsForTopic(record.topic()); >> int numPartitions = partitions.size(); >> . . . >> >> And then we have KafkaProducer.partitionsFor(topicName); >> >> It appears that the two may result in a different number causing failure >> during internal validation in Partitioner >> >> // they have given us a partition, use it >> if (record.partition() < 0 || record.partition() >= >> numPartitions) >> throw new IllegalArgumentException("Invalid partition >> given with record: " + record.partition() >> + " is not in the range >> [0..." >> + numPartitions >> + "].”); >> >> >> Basically we have RoundRobin partitioner that uses >> KafkaProducer.partitionsFor(topicName) to calculate the cycle, but getting >> the above error. >> Could someone please explain the difference between the two methods to get >> partitions size? >> >> Cheers >> Oleg > > > -- > Thanks, > Ewen