Thanks Jun for the explanation. Hi Neha,
Yes, we have the num.partitions set to 1 in server.properties- # The number of logical partitions per topic per server. More partitions allow greater parallelism # for consumption, but also mean more files. num.partitions=1 Yes, it is clear from the send() documentation but isn't it true that the above property will guarantee a single partition? Thanks, Anuj On Tue, Feb 28, 2012 at 11:08 PM, Neha Narkhede <neha.narkh...@gmail.com>wrote: > Anuj, > > Here are the API docs for the Producer - > http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/docs/ > > The API docs will show that the send() request is a ProducerData > object, that takes in a topic, list of messages, and an optional key. > > When you say, you've set the number of partitions to 1, do you mean > you have one broker with num.partitions=1 ? > > Thanks, > Neha > > On Mon, Feb 27, 2012 at 11:56 PM, Anuj Kumar <anujs...@gmail.com> wrote: > > Thanks Jun and Neha. > > > > Setting reconnect.interval to a higher value works but this doesn't look > > like a good solution to me. I would like to know why we have > > reconnect.interval? If the socket is already active and sending data, why > > do we want to disconnect it and connect again? I am trying to understand > > the use case where reconnect.interval will be useful. > > > > Regarding the partition key, we already have set the number of partitions > > to 1, so do we need to explicitly use that ProducerData constructor? > > > > Thanks for your help. > > > > Regards, > > Anuj > > > > On Mon, Feb 27, 2012 at 11:31 PM, Neha Narkhede <neha.narkh...@gmail.com > >wrote: > > > >> >> Further, we looked into the quick start guide of Kafka to produce > data > >> to > >> the same partition but the constructor mentioned in the point-6 that > takes > >> an extra parameter for the partition key is not available in kafka-0.7. > Am > >> I missing something here? > >> > >> Instead of that, the ProducerData object takes in a key, for a set of > >> messages. It uses the "partitioner.class" to pick a partition id. The > >> default partitioner picks a partiiton id at random. You can also plug > >> in your own custom partitioner if you want a different partitioning > >> scheme. Messages are delivered in order per partition. > >> > >> Thanks, > >> Neha > >> > >> On Mon, Feb 27, 2012 at 9:43 AM, Jun Rao <jun...@gmail.com> wrote: > >> > Anuj, > >> > > >> > Sequencing should be guaranteed within a partition. The problem you > saw > >> > could be related to reconnect.interval, which controls how frequently > the > >> > producer gets a new socket. Everytime a producer gets a new socket, > there > >> > is a small window that events sent over the old socket and over the > new > >> one > >> > may be out of order. You can set reconnect.interval to maxInt to avoid > >> new > >> > sockets being created. > >> > > >> > Thanks, > >> > > >> > Jun > >> > > >> > On Sun, Feb 26, 2012 at 8:06 PM, Anuj Kumar <anujs...@gmail.com> > wrote: > >> > > >> >> Hello Everyone, > >> >> > >> >> We are using Kafka-0.7.0 to push a sequence of numbers and we want to > >> >> preserve the exact sequence in which they are produced by the > producer. > >> We > >> >> wrote a sample producer that simply produces a million points in a > >> >> sequence. The code snippet is- > >> >> > >> >> int t = 0; > >> >> while( t<=1000000) > >> >> { > >> >> producer.send( new ProducerData<String,String>( topic,new > >> >> Integer(t).toString()) ); > >> >> ++t; > >> >> } > >> >> > >> >> where, the producer is initialized as- > >> >> > >> >> Properties props = new Properties(); > >> >> props.put("serializer.class", "kafka.serializer.StringEncoder"); > >> >> props.put("zk.connect", "localhost:2181"); > >> >> producer = new Producer<String, String>(new ProducerConfig(props)); > >> >> > >> >> and topic is taken as an input to the constructor of the Producer > class. > >> >> > >> >> On the consumer side, our code looks like- > >> >> > >> >> Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); > >> >> topicCountMap.put(topic, new Integer(1)); > >> >> Map<String, List<KafkaMessageStream<Message>>> consumerMap = > >> >> consumer.createMessageStreams(topicCountMap); > >> >> KafkaMessageStream<Message> stream = consumerMap.get(topic).get(0); > >> >> ConsumerIterator<Message> it = stream.iterator(); > >> >> Integer prev = null; > >> >> while(it.hasNext()) > >> >> { > >> >> String msg = getMessage(it.next()); > >> >> int current = Integer.parseInt(msg); > >> >> ... > >> >> } > >> >> > >> >> The consumer is initialized as- > >> >> > >> >> public SimpleKafkaQueueConsumer(String _topic) > >> >> { > >> >> topic = _topic; > >> >> consumer = > >> Consumer.createJavaConsumerConnector(createConsumerConfig()); > >> >> } > >> >> > >> >> private static ConsumerConfig createConsumerConfig() > >> >> { > >> >> Properties props = new Properties(); > >> >> props.put("zk.connect", "localhost:2181"); > >> >> props.put("groupid", "simpleconsumer"); > >> >> return new ConsumerConfig(props); > >> >> } > >> >> > >> >> We started both the producer and consumer with the same topic but we > >> >> noticed that on the consumer side the sequence of the number changes > >> (only > >> >> for some of the numbers). We would like the sequence to be exactly > the > >> >> same. > >> >> > >> >> We initially thought that it might be an issue with the data being > >> stored > >> >> in different partitions. Is that the case? > >> >> > >> >> Further, we looked into the quick start guide of Kafka to produce > data > >> to > >> >> the same partition but the constructor mentioned in the point-6 that > >> takes > >> >> an extra parameter for the partition key is not available in > kafka-0.7. > >> Am > >> >> I missing something here? > >> >> > >> >> Is there a way to preserve the sequence? > >> >> > >> >> Regards, > >> >> Anuj > >> >> > >> >