>> Further, we looked into the quick start guide of Kafka to produce data to the same partition but the constructor mentioned in the point-6 that takes an extra parameter for the partition key is not available in kafka-0.7. Am I missing something here?
Instead of that, the ProducerData object takes in a key, for a set of messages. It uses the "partitioner.class" to pick a partition id. The default partitioner picks a partiiton id at random. You can also plug in your own custom partitioner if you want a different partitioning scheme. Messages are delivered in order per partition. Thanks, Neha On Mon, Feb 27, 2012 at 9:43 AM, Jun Rao <jun...@gmail.com> wrote: > Anuj, > > Sequencing should be guaranteed within a partition. The problem you saw > could be related to reconnect.interval, which controls how frequently the > producer gets a new socket. Everytime a producer gets a new socket, there > is a small window that events sent over the old socket and over the new one > may be out of order. You can set reconnect.interval to maxInt to avoid new > sockets being created. > > Thanks, > > Jun > > On Sun, Feb 26, 2012 at 8:06 PM, Anuj Kumar <anujs...@gmail.com> wrote: > >> Hello Everyone, >> >> We are using Kafka-0.7.0 to push a sequence of numbers and we want to >> preserve the exact sequence in which they are produced by the producer. We >> wrote a sample producer that simply produces a million points in a >> sequence. The code snippet is- >> >> int t = 0; >> while( t<=1000000) >> { >> producer.send( new ProducerData<String,String>( topic,new >> Integer(t).toString()) ); >> ++t; >> } >> >> where, the producer is initialized as- >> >> Properties props = new Properties(); >> props.put("serializer.class", "kafka.serializer.StringEncoder"); >> props.put("zk.connect", "localhost:2181"); >> producer = new Producer<String, String>(new ProducerConfig(props)); >> >> and topic is taken as an input to the constructor of the Producer class. >> >> On the consumer side, our code looks like- >> >> Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); >> topicCountMap.put(topic, new Integer(1)); >> Map<String, List<KafkaMessageStream<Message>>> consumerMap = >> consumer.createMessageStreams(topicCountMap); >> KafkaMessageStream<Message> stream = consumerMap.get(topic).get(0); >> ConsumerIterator<Message> it = stream.iterator(); >> Integer prev = null; >> while(it.hasNext()) >> { >> String msg = getMessage(it.next()); >> int current = Integer.parseInt(msg); >> ... >> } >> >> The consumer is initialized as- >> >> public SimpleKafkaQueueConsumer(String _topic) >> { >> topic = _topic; >> consumer = Consumer.createJavaConsumerConnector(createConsumerConfig()); >> } >> >> private static ConsumerConfig createConsumerConfig() >> { >> Properties props = new Properties(); >> props.put("zk.connect", "localhost:2181"); >> props.put("groupid", "simpleconsumer"); >> return new ConsumerConfig(props); >> } >> >> We started both the producer and consumer with the same topic but we >> noticed that on the consumer side the sequence of the number changes (only >> for some of the numbers). We would like the sequence to be exactly the >> same. >> >> We initially thought that it might be an issue with the data being stored >> in different partitions. Is that the case? >> >> Further, we looked into the quick start guide of Kafka to produce data to >> the same partition but the constructor mentioned in the point-6 that takes >> an extra parameter for the partition key is not available in kafka-0.7. Am >> I missing something here? >> >> Is there a way to preserve the sequence? >> >> Regards, >> Anuj >>