Anuj, Sequencing should be guaranteed within a partition. The problem you saw could be related to reconnect.interval, which controls how frequently the producer gets a new socket. Everytime a producer gets a new socket, there is a small window that events sent over the old socket and over the new one may be out of order. You can set reconnect.interval to maxInt to avoid new sockets being created.
Thanks, Jun On Sun, Feb 26, 2012 at 8:06 PM, Anuj Kumar <anujs...@gmail.com> wrote: > Hello Everyone, > > We are using Kafka-0.7.0 to push a sequence of numbers and we want to > preserve the exact sequence in which they are produced by the producer. We > wrote a sample producer that simply produces a million points in a > sequence. The code snippet is- > > int t = 0; > while( t<=1000000) > { > producer.send( new ProducerData<String,String>( topic,new > Integer(t).toString()) ); > ++t; > } > > where, the producer is initialized as- > > Properties props = new Properties(); > props.put("serializer.class", "kafka.serializer.StringEncoder"); > props.put("zk.connect", "localhost:2181"); > producer = new Producer<String, String>(new ProducerConfig(props)); > > and topic is taken as an input to the constructor of the Producer class. > > On the consumer side, our code looks like- > > Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); > topicCountMap.put(topic, new Integer(1)); > Map<String, List<KafkaMessageStream<Message>>> consumerMap = > consumer.createMessageStreams(topicCountMap); > KafkaMessageStream<Message> stream = consumerMap.get(topic).get(0); > ConsumerIterator<Message> it = stream.iterator(); > Integer prev = null; > while(it.hasNext()) > { > String msg = getMessage(it.next()); > int current = Integer.parseInt(msg); > ... > } > > The consumer is initialized as- > > public SimpleKafkaQueueConsumer(String _topic) > { > topic = _topic; > consumer = Consumer.createJavaConsumerConnector(createConsumerConfig()); > } > > private static ConsumerConfig createConsumerConfig() > { > Properties props = new Properties(); > props.put("zk.connect", "localhost:2181"); > props.put("groupid", "simpleconsumer"); > return new ConsumerConfig(props); > } > > We started both the producer and consumer with the same topic but we > noticed that on the consumer side the sequence of the number changes (only > for some of the numbers). We would like the sequence to be exactly the > same. > > We initially thought that it might be an issue with the data being stored > in different partitions. Is that the case? > > Further, we looked into the quick start guide of Kafka to produce data to > the same partition but the constructor mentioned in the point-6 that takes > an extra parameter for the partition key is not available in kafka-0.7. Am > I missing something here? > > Is there a way to preserve the sequence? > > Regards, > Anuj >