Re: kafka partitions api
I was going to make a separate email thread for this question but this thread's topic echoes what my own would have been. How can I query a broker or zookeeper for the number of partitions in a given topic? I'm trying to write a custom partitioner that sends a message to every partition within a topic, and so I need to know the total number of partitions before I call Producer.send(). Alex On Thu, Feb 26, 2015 at 7:32 PM, tao xiao xiaotao...@gmail.com wrote: Gaurav, You can get the partition number the message belongs to via MessageAndMetadata.partition() On Fri, Feb 27, 2015 at 5:16 AM, Jun Rao j...@confluent.io wrote: The partition api is exposed to the consumer in 0.8.2. Thanks, Jun On Thu, Feb 26, 2015 at 10:53 AM, Gaurav Agarwal gaurav130...@gmail.com wrote: After retrieving a kafka stream or kafka message how to get the corresponding partition number to which it belongs ? I am using kafka version 0.8.1. More specifically kafka.consumer.KafkaStream and kafka.message.MessageAndMetaData classes, does not provide API to retrieve partition number. Are there any other API's to get the partition number? IF there are multiple partitions of a topic ,Do i need to declare from java code how many partitions the topic contains or i can leave it topic Kafkastream will take the partition information from kafka broker at runtime.? -- Regards, Tao
Re: kafka partitions api
Alex, You can get partition from MessageAndMetadata as partition is exported via constructor parameter On Fri, Feb 27, 2015 at 2:12 PM, Alex Melville amelvi...@g.hmc.edu wrote: Tao and Gaurav, After looking through the source code in Kafka v8.2.0, I don't see any partition() function on the MessageAndMetadata object. Here's the class's source: package kafka.message import kafka.serializer.Decoder import kafka.utils.Utils case class MessageAndMetadata[K, V](topic: String, partition: Int, private val rawMessage: Message, offset: Long, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) { /** * Return the decoded message key and payload */ def key(): K = if(rawMessage.key == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(rawMessage.key)) def message(): V = if(rawMessage.isNull) null.asInstanceOf[V] else valueDecoder.fromBytes(Utils.readBytes(rawMessage.payload)) -Alex M. On Thu, Feb 26, 2015 at 9:54 PM, Gaurav Agarwal gaurav130...@gmail.com wrote: that's fine to me , you can open a separate thread , But the original question when the consumerconnector got connected to a separate topic , Whether KafkaStream will have all the information of the partitions for that corresponding topic , Please confirm Thanks On Fri, Feb 27, 2015 at 11:20 AM, Alex Melville amelvi...@g.hmc.edu wrote: I was going to make a separate email thread for this question but this thread's topic echoes what my own would have been. How can I query a broker or zookeeper for the number of partitions in a given topic? I'm trying to write a custom partitioner that sends a message to every partition within a topic, and so I need to know the total number of partitions before I call Producer.send(). Alex On Thu, Feb 26, 2015 at 7:32 PM, tao xiao xiaotao...@gmail.com wrote: Gaurav, You can get the partition number the message belongs to via MessageAndMetadata.partition() On Fri, Feb 27, 2015 at 5:16 AM, Jun Rao j...@confluent.io wrote: The partition api is exposed to the consumer in 0.8.2. Thanks, Jun On Thu, Feb 26, 2015 at 10:53 AM, Gaurav Agarwal gaurav130...@gmail.com wrote: After retrieving a kafka stream or kafka message how to get the corresponding partition number to which it belongs ? I am using kafka version 0.8.1. More specifically kafka.consumer.KafkaStream and kafka.message.MessageAndMetaData classes, does not provide API to retrieve partition number. Are there any other API's to get the partition number? IF there are multiple partitions of a topic ,Do i need to declare from java code how many partitions the topic contains or i can leave it topic Kafkastream will take the partition information from kafka broker at runtime.? -- Regards, Tao -- Regards, Tao
Re: kafka partitions api
that's fine to me , you can open a separate thread , But the original question when the consumerconnector got connected to a separate topic , Whether KafkaStream will have all the information of the partitions for that corresponding topic , Please confirm Thanks On Fri, Feb 27, 2015 at 11:20 AM, Alex Melville amelvi...@g.hmc.edu wrote: I was going to make a separate email thread for this question but this thread's topic echoes what my own would have been. How can I query a broker or zookeeper for the number of partitions in a given topic? I'm trying to write a custom partitioner that sends a message to every partition within a topic, and so I need to know the total number of partitions before I call Producer.send(). Alex On Thu, Feb 26, 2015 at 7:32 PM, tao xiao xiaotao...@gmail.com wrote: Gaurav, You can get the partition number the message belongs to via MessageAndMetadata.partition() On Fri, Feb 27, 2015 at 5:16 AM, Jun Rao j...@confluent.io wrote: The partition api is exposed to the consumer in 0.8.2. Thanks, Jun On Thu, Feb 26, 2015 at 10:53 AM, Gaurav Agarwal gaurav130...@gmail.com wrote: After retrieving a kafka stream or kafka message how to get the corresponding partition number to which it belongs ? I am using kafka version 0.8.1. More specifically kafka.consumer.KafkaStream and kafka.message.MessageAndMetaData classes, does not provide API to retrieve partition number. Are there any other API's to get the partition number? IF there are multiple partitions of a topic ,Do i need to declare from java code how many partitions the topic contains or i can leave it topic Kafkastream will take the partition information from kafka broker at runtime.? -- Regards, Tao
Re: kafka partitions api
Tao and Gaurav, After looking through the source code in Kafka v8.2.0, I don't see any partition() function on the MessageAndMetadata object. Here's the class's source: package kafka.message import kafka.serializer.Decoder import kafka.utils.Utils case class MessageAndMetadata[K, V](topic: String, partition: Int, private val rawMessage: Message, offset: Long, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) { /** * Return the decoded message key and payload */ def key(): K = if(rawMessage.key == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(rawMessage.key)) def message(): V = if(rawMessage.isNull) null.asInstanceOf[V] else valueDecoder.fromBytes(Utils.readBytes(rawMessage.payload)) -Alex M. On Thu, Feb 26, 2015 at 9:54 PM, Gaurav Agarwal gaurav130...@gmail.com wrote: that's fine to me , you can open a separate thread , But the original question when the consumerconnector got connected to a separate topic , Whether KafkaStream will have all the information of the partitions for that corresponding topic , Please confirm Thanks On Fri, Feb 27, 2015 at 11:20 AM, Alex Melville amelvi...@g.hmc.edu wrote: I was going to make a separate email thread for this question but this thread's topic echoes what my own would have been. How can I query a broker or zookeeper for the number of partitions in a given topic? I'm trying to write a custom partitioner that sends a message to every partition within a topic, and so I need to know the total number of partitions before I call Producer.send(). Alex On Thu, Feb 26, 2015 at 7:32 PM, tao xiao xiaotao...@gmail.com wrote: Gaurav, You can get the partition number the message belongs to via MessageAndMetadata.partition() On Fri, Feb 27, 2015 at 5:16 AM, Jun Rao j...@confluent.io wrote: The partition api is exposed to the consumer in 0.8.2. Thanks, Jun On Thu, Feb 26, 2015 at 10:53 AM, Gaurav Agarwal gaurav130...@gmail.com wrote: After retrieving a kafka stream or kafka message how to get the corresponding partition number to which it belongs ? I am using kafka version 0.8.1. More specifically kafka.consumer.KafkaStream and kafka.message.MessageAndMetaData classes, does not provide API to retrieve partition number. Are there any other API's to get the partition number? IF there are multiple partitions of a topic ,Do i need to declare from java code how many partitions the topic contains or i can leave it topic Kafkastream will take the partition information from kafka broker at runtime.? -- Regards, Tao
Re: kafka partitions api
The partition api is exposed to the consumer in 0.8.2. Thanks, Jun On Thu, Feb 26, 2015 at 10:53 AM, Gaurav Agarwal gaurav130...@gmail.com wrote: After retrieving a kafka stream or kafka message how to get the corresponding partition number to which it belongs ? I am using kafka version 0.8.1. More specifically kafka.consumer.KafkaStream and kafka.message.MessageAndMetaData classes, does not provide API to retrieve partition number. Are there any other API's to get the partition number? IF there are multiple partitions of a topic ,Do i need to declare from java code how many partitions the topic contains or i can leave it topic Kafkastream will take the partition information from kafka broker at runtime.?
Re: kafka partitions api
Gaurav, You can get the partition number the message belongs to via MessageAndMetadata.partition() On Fri, Feb 27, 2015 at 5:16 AM, Jun Rao j...@confluent.io wrote: The partition api is exposed to the consumer in 0.8.2. Thanks, Jun On Thu, Feb 26, 2015 at 10:53 AM, Gaurav Agarwal gaurav130...@gmail.com wrote: After retrieving a kafka stream or kafka message how to get the corresponding partition number to which it belongs ? I am using kafka version 0.8.1. More specifically kafka.consumer.KafkaStream and kafka.message.MessageAndMetaData classes, does not provide API to retrieve partition number. Are there any other API's to get the partition number? IF there are multiple partitions of a topic ,Do i need to declare from java code how many partitions the topic contains or i can leave it topic Kafkastream will take the partition information from kafka broker at runtime.? -- Regards, Tao