Thanks Neha. Tricky indeed. As of now, we are going as per Jun's suggestion but we will look into implementing a custom partitioner.
- Anuj On Tue, Feb 28, 2012 at 11:36 PM, Neha Narkhede <neha.narkh...@gmail.com>wrote: > >> Yes, it is clear from the send() documentation but isn't it true that > the > above property will guarantee a single partition? > > Yes, it is a little tricky to understand. Currently, the Kafka server > behavior is that every topic lives on every broker. So, if you have > one kafka server with one partition, you truly have only a single > partition for that topic. Now, if you bring up another Kafka server > with a single partition, the topic *can* have 2 partitions, if you > don't use your own custom partitioner. If you stay with our default > partitioner, it will think you have 2 available partitions for that > topic, and will try to send data to both of those partitions. > > Thanks, > Neha > > On Tue, Feb 28, 2012 at 9:47 AM, Anuj Kumar <anujs...@gmail.com> wrote: > > Thanks Jun for the explanation. > > > > Hi Neha, > > > > Yes, we have the num.partitions set to 1 in server.properties- > > > > # The number of logical partitions per topic per server. More partitions > > allow greater parallelism > > # for consumption, but also mean more files. > > num.partitions=1 > > > > Yes, it is clear from the send() documentation but isn't it true that the > > above property will guarantee a single partition? > > > > Thanks, > > Anuj > > > > On Tue, Feb 28, 2012 at 11:08 PM, Neha Narkhede <neha.narkh...@gmail.com > >wrote: > > > >> Anuj, > >> > >> Here are the API docs for the Producer - > >> http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/docs/ > >> > >> The API docs will show that the send() request is a ProducerData > >> object, that takes in a topic, list of messages, and an optional key. > >> > >> When you say, you've set the number of partitions to 1, do you mean > >> you have one broker with num.partitions=1 ? > >> > >> Thanks, > >> Neha > >> > >> On Mon, Feb 27, 2012 at 11:56 PM, Anuj Kumar <anujs...@gmail.com> > wrote: > >> > Thanks Jun and Neha. > >> > > >> > Setting reconnect.interval to a higher value works but this doesn't > look > >> > like a good solution to me. I would like to know why we have > >> > reconnect.interval? If the socket is already active and sending data, > why > >> > do we want to disconnect it and connect again? I am trying to > understand > >> > the use case where reconnect.interval will be useful. > >> > > >> > Regarding the partition key, we already have set the number of > partitions > >> > to 1, so do we need to explicitly use that ProducerData constructor? > >> > > >> > Thanks for your help. > >> > > >> > Regards, > >> > Anuj > >> > > >> > On Mon, Feb 27, 2012 at 11:31 PM, Neha Narkhede < > neha.narkh...@gmail.com > >> >wrote: > >> > > >> >> >> Further, we looked into the quick start guide of Kafka to produce > >> data > >> >> to > >> >> the same partition but the constructor mentioned in the point-6 that > >> takes > >> >> an extra parameter for the partition key is not available in > kafka-0.7. > >> Am > >> >> I missing something here? > >> >> > >> >> Instead of that, the ProducerData object takes in a key, for a set of > >> >> messages. It uses the "partitioner.class" to pick a partition id. The > >> >> default partitioner picks a partiiton id at random. You can also plug > >> >> in your own custom partitioner if you want a different partitioning > >> >> scheme. Messages are delivered in order per partition. > >> >> > >> >> Thanks, > >> >> Neha > >> >> > >> >> On Mon, Feb 27, 2012 at 9:43 AM, Jun Rao <jun...@gmail.com> wrote: > >> >> > Anuj, > >> >> > > >> >> > Sequencing should be guaranteed within a partition. The problem you > >> saw > >> >> > could be related to reconnect.interval, which controls how > frequently > >> the > >> >> > producer gets a new socket. Everytime a producer gets a new socket, > >> there > >> >> > is a small window that events sent over the old socket and over the > >> new > >> >> one > >> >> > may be out of order. You can set reconnect.interval to maxInt to > avoid > >> >> new > >> >> > sockets being created. > >> >> > > >> >> > Thanks, > >> >> > > >> >> > Jun > >> >> > > >> >> > On Sun, Feb 26, 2012 at 8:06 PM, Anuj Kumar <anujs...@gmail.com> > >> wrote: > >> >> > > >> >> >> Hello Everyone, > >> >> >> > >> >> >> We are using Kafka-0.7.0 to push a sequence of numbers and we > want to > >> >> >> preserve the exact sequence in which they are produced by the > >> producer. > >> >> We > >> >> >> wrote a sample producer that simply produces a million points in a > >> >> >> sequence. The code snippet is- > >> >> >> > >> >> >> int t = 0; > >> >> >> while( t<=1000000) > >> >> >> { > >> >> >> producer.send( new ProducerData<String,String>( topic,new > >> >> >> Integer(t).toString()) ); > >> >> >> ++t; > >> >> >> } > >> >> >> > >> >> >> where, the producer is initialized as- > >> >> >> > >> >> >> Properties props = new Properties(); > >> >> >> props.put("serializer.class", "kafka.serializer.StringEncoder"); > >> >> >> props.put("zk.connect", "localhost:2181"); > >> >> >> producer = new Producer<String, String>(new > ProducerConfig(props)); > >> >> >> > >> >> >> and topic is taken as an input to the constructor of the Producer > >> class. > >> >> >> > >> >> >> On the consumer side, our code looks like- > >> >> >> > >> >> >> Map<String, Integer> topicCountMap = new HashMap<String, > Integer>(); > >> >> >> topicCountMap.put(topic, new Integer(1)); > >> >> >> Map<String, List<KafkaMessageStream<Message>>> consumerMap = > >> >> >> consumer.createMessageStreams(topicCountMap); > >> >> >> KafkaMessageStream<Message> stream = > consumerMap.get(topic).get(0); > >> >> >> ConsumerIterator<Message> it = stream.iterator(); > >> >> >> Integer prev = null; > >> >> >> while(it.hasNext()) > >> >> >> { > >> >> >> String msg = getMessage(it.next()); > >> >> >> int current = Integer.parseInt(msg); > >> >> >> ... > >> >> >> } > >> >> >> > >> >> >> The consumer is initialized as- > >> >> >> > >> >> >> public SimpleKafkaQueueConsumer(String _topic) > >> >> >> { > >> >> >> topic = _topic; > >> >> >> consumer = > >> >> Consumer.createJavaConsumerConnector(createConsumerConfig()); > >> >> >> } > >> >> >> > >> >> >> private static ConsumerConfig createConsumerConfig() > >> >> >> { > >> >> >> Properties props = new Properties(); > >> >> >> props.put("zk.connect", "localhost:2181"); > >> >> >> props.put("groupid", "simpleconsumer"); > >> >> >> return new ConsumerConfig(props); > >> >> >> } > >> >> >> > >> >> >> We started both the producer and consumer with the same topic but > we > >> >> >> noticed that on the consumer side the sequence of the number > changes > >> >> (only > >> >> >> for some of the numbers). We would like the sequence to be exactly > >> the > >> >> >> same. > >> >> >> > >> >> >> We initially thought that it might be an issue with the data being > >> >> stored > >> >> >> in different partitions. Is that the case? > >> >> >> > >> >> >> Further, we looked into the quick start guide of Kafka to produce > >> data > >> >> to > >> >> >> the same partition but the constructor mentioned in the point-6 > that > >> >> takes > >> >> >> an extra parameter for the partition key is not available in > >> kafka-0.7. > >> >> Am > >> >> >> I missing something here? > >> >> >> > >> >> >> Is there a way to preserve the sequence? > >> >> >> > >> >> >> Regards, > >> >> >> Anuj > >> >> >> > >> >> > >> >