Anuj, Here are the API docs for the Producer - http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/docs/
The API docs will show that the send() request is a ProducerData object, that takes in a topic, list of messages, and an optional key. When you say, you've set the number of partitions to 1, do you mean you have one broker with num.partitions=1 ? Thanks, Neha On Mon, Feb 27, 2012 at 11:56 PM, Anuj Kumar <anujs...@gmail.com> wrote: > Thanks Jun and Neha. > > Setting reconnect.interval to a higher value works but this doesn't look > like a good solution to me. I would like to know why we have > reconnect.interval? If the socket is already active and sending data, why > do we want to disconnect it and connect again? I am trying to understand > the use case where reconnect.interval will be useful. > > Regarding the partition key, we already have set the number of partitions > to 1, so do we need to explicitly use that ProducerData constructor? > > Thanks for your help. > > Regards, > Anuj > > On Mon, Feb 27, 2012 at 11:31 PM, Neha Narkhede > <neha.narkh...@gmail.com>wrote: > >> >> Further, we looked into the quick start guide of Kafka to produce data >> to >> the same partition but the constructor mentioned in the point-6 that takes >> an extra parameter for the partition key is not available in kafka-0.7. Am >> I missing something here? >> >> Instead of that, the ProducerData object takes in a key, for a set of >> messages. It uses the "partitioner.class" to pick a partition id. The >> default partitioner picks a partiiton id at random. You can also plug >> in your own custom partitioner if you want a different partitioning >> scheme. Messages are delivered in order per partition. >> >> Thanks, >> Neha >> >> On Mon, Feb 27, 2012 at 9:43 AM, Jun Rao <jun...@gmail.com> wrote: >> > Anuj, >> > >> > Sequencing should be guaranteed within a partition. The problem you saw >> > could be related to reconnect.interval, which controls how frequently the >> > producer gets a new socket. Everytime a producer gets a new socket, there >> > is a small window that events sent over the old socket and over the new >> one >> > may be out of order. You can set reconnect.interval to maxInt to avoid >> new >> > sockets being created. >> > >> > Thanks, >> > >> > Jun >> > >> > On Sun, Feb 26, 2012 at 8:06 PM, Anuj Kumar <anujs...@gmail.com> wrote: >> > >> >> Hello Everyone, >> >> >> >> We are using Kafka-0.7.0 to push a sequence of numbers and we want to >> >> preserve the exact sequence in which they are produced by the producer. >> We >> >> wrote a sample producer that simply produces a million points in a >> >> sequence. The code snippet is- >> >> >> >> int t = 0; >> >> while( t<=1000000) >> >> { >> >> producer.send( new ProducerData<String,String>( topic,new >> >> Integer(t).toString()) ); >> >> ++t; >> >> } >> >> >> >> where, the producer is initialized as- >> >> >> >> Properties props = new Properties(); >> >> props.put("serializer.class", "kafka.serializer.StringEncoder"); >> >> props.put("zk.connect", "localhost:2181"); >> >> producer = new Producer<String, String>(new ProducerConfig(props)); >> >> >> >> and topic is taken as an input to the constructor of the Producer class. >> >> >> >> On the consumer side, our code looks like- >> >> >> >> Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); >> >> topicCountMap.put(topic, new Integer(1)); >> >> Map<String, List<KafkaMessageStream<Message>>> consumerMap = >> >> consumer.createMessageStreams(topicCountMap); >> >> KafkaMessageStream<Message> stream = consumerMap.get(topic).get(0); >> >> ConsumerIterator<Message> it = stream.iterator(); >> >> Integer prev = null; >> >> while(it.hasNext()) >> >> { >> >> String msg = getMessage(it.next()); >> >> int current = Integer.parseInt(msg); >> >> ... >> >> } >> >> >> >> The consumer is initialized as- >> >> >> >> public SimpleKafkaQueueConsumer(String _topic) >> >> { >> >> topic = _topic; >> >> consumer = >> Consumer.createJavaConsumerConnector(createConsumerConfig()); >> >> } >> >> >> >> private static ConsumerConfig createConsumerConfig() >> >> { >> >> Properties props = new Properties(); >> >> props.put("zk.connect", "localhost:2181"); >> >> props.put("groupid", "simpleconsumer"); >> >> return new ConsumerConfig(props); >> >> } >> >> >> >> We started both the producer and consumer with the same topic but we >> >> noticed that on the consumer side the sequence of the number changes >> (only >> >> for some of the numbers). We would like the sequence to be exactly the >> >> same. >> >> >> >> We initially thought that it might be an issue with the data being >> stored >> >> in different partitions. Is that the case? >> >> >> >> Further, we looked into the quick start guide of Kafka to produce data >> to >> >> the same partition but the constructor mentioned in the point-6 that >> takes >> >> an extra parameter for the partition key is not available in kafka-0.7. >> Am >> >> I missing something here? >> >> >> >> Is there a way to preserve the sequence? >> >> >> >> Regards, >> >> Anuj >> >> >>