Anuj,

Here are the API docs for the Producer -
http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/docs/

The API docs will show that the send() request is a ProducerData
object, that takes in a topic, list of messages, and an optional key.

When you say, you've set the number of partitions to 1, do you mean
you have one broker with num.partitions=1 ?

Thanks,
Neha

On Mon, Feb 27, 2012 at 11:56 PM, Anuj Kumar <anujs...@gmail.com> wrote:
> Thanks Jun and Neha.
>
> Setting reconnect.interval to a higher value works but this doesn't look
> like a good solution to me. I would like to know why we have
> reconnect.interval? If the socket is already active and sending data, why
> do we want to disconnect it and connect again? I am trying to understand
> the use case where reconnect.interval will be useful.
>
> Regarding the partition key, we already have set the number of partitions
> to 1, so do we need to explicitly use that ProducerData constructor?
>
> Thanks for your help.
>
> Regards,
> Anuj
>
> On Mon, Feb 27, 2012 at 11:31 PM, Neha Narkhede 
> <neha.narkh...@gmail.com>wrote:
>
>> >> Further, we looked into the quick start guide of Kafka to produce data
>> to
>> the same partition but the constructor mentioned in the point-6 that takes
>> an extra parameter for the partition key is not available in kafka-0.7. Am
>> I missing something here?
>>
>> Instead of that, the ProducerData object takes in a key, for a set of
>> messages. It uses the "partitioner.class" to pick a partition id. The
>> default partitioner picks a partiiton id at random. You can also plug
>> in your own custom partitioner if you want a different partitioning
>> scheme. Messages are delivered in order per partition.
>>
>> Thanks,
>> Neha
>>
>> On Mon, Feb 27, 2012 at 9:43 AM, Jun Rao <jun...@gmail.com> wrote:
>> > Anuj,
>> >
>> > Sequencing should be guaranteed within a partition. The problem you saw
>> > could be related to reconnect.interval, which controls how frequently the
>> > producer gets a new socket. Everytime a producer gets a new socket, there
>> > is a small window that events sent over the old socket and over the new
>> one
>> > may be out of order. You can set reconnect.interval to maxInt to avoid
>> new
>> > sockets being created.
>> >
>> > Thanks,
>> >
>> > Jun
>> >
>> > On Sun, Feb 26, 2012 at 8:06 PM, Anuj Kumar <anujs...@gmail.com> wrote:
>> >
>> >> Hello Everyone,
>> >>
>> >> We are using Kafka-0.7.0 to push a sequence of numbers and we want to
>> >> preserve the exact sequence in which they are produced by the producer.
>> We
>> >> wrote a sample producer that simply produces a million points in a
>> >> sequence. The code snippet is-
>> >>
>> >> int t = 0;
>> >> while( t<=1000000)
>> >> {
>> >>    producer.send( new ProducerData<String,String>( topic,new
>> >> Integer(t).toString()) );
>> >>     ++t;
>> >> }
>> >>
>> >> where, the producer is initialized as-
>> >>
>> >> Properties props = new Properties();
>> >> props.put("serializer.class", "kafka.serializer.StringEncoder");
>> >> props.put("zk.connect", "localhost:2181");
>> >> producer = new Producer<String, String>(new ProducerConfig(props));
>> >>
>> >> and topic is taken as an input to the constructor of the Producer class.
>> >>
>> >> On the consumer side, our code looks like-
>> >>
>> >> Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
>> >> topicCountMap.put(topic, new Integer(1));
>> >> Map<String, List<KafkaMessageStream<Message>>> consumerMap =
>> >> consumer.createMessageStreams(topicCountMap);
>> >> KafkaMessageStream<Message> stream =  consumerMap.get(topic).get(0);
>> >> ConsumerIterator<Message> it = stream.iterator();
>> >> Integer prev = null;
>> >> while(it.hasNext())
>> >> {
>> >>     String msg = getMessage(it.next());
>> >>     int current = Integer.parseInt(msg);
>> >>     ...
>> >> }
>> >>
>> >> The consumer is initialized as-
>> >>
>> >> public SimpleKafkaQueueConsumer(String _topic)
>> >> {
>> >>    topic = _topic;
>> >>    consumer =
>> Consumer.createJavaConsumerConnector(createConsumerConfig());
>> >> }
>> >>
>> >> private static ConsumerConfig createConsumerConfig()
>> >> {
>> >>    Properties props = new Properties();
>> >>    props.put("zk.connect", "localhost:2181");
>> >>    props.put("groupid", "simpleconsumer");
>> >>    return new ConsumerConfig(props);
>> >> }
>> >>
>> >> We started both the producer and consumer with the same topic but we
>> >> noticed that on the consumer side the sequence of the number changes
>> (only
>> >> for some of the numbers). We would like the sequence to be exactly the
>> >> same.
>> >>
>> >> We initially thought that it might be an issue with the data being
>> stored
>> >> in different partitions. Is that the case?
>> >>
>> >> Further, we looked into the quick start guide of Kafka to produce data
>> to
>> >> the same partition but the constructor mentioned in the point-6 that
>> takes
>> >> an extra parameter for the partition key is not available in kafka-0.7.
>> Am
>> >> I missing something here?
>> >>
>> >> Is there a way to preserve the sequence?
>> >>
>> >> Regards,
>> >> Anuj
>> >>
>>

Reply via email to