Hello Everyone,

We are using Kafka-0.7.0 to push a sequence of numbers and we want to
preserve the exact sequence in which they are produced by the producer. We
wrote a sample producer that simply produces a million points in a
sequence. The code snippet is-

int t = 0;
while( t<=1000000)
{
    producer.send( new ProducerData<String,String>( topic,new
Integer(t).toString()) );
     ++t;
}

where, the producer is initialized as-

Properties props = new Properties();
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("zk.connect", "localhost:2181");
producer = new Producer<String, String>(new ProducerConfig(props));

and topic is taken as an input to the constructor of the Producer class.

On the consumer side, our code looks like-

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaMessageStream<Message>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
KafkaMessageStream<Message> stream =  consumerMap.get(topic).get(0);
ConsumerIterator<Message> it = stream.iterator();
Integer prev = null;
while(it.hasNext())
{
     String msg = getMessage(it.next());
     int current = Integer.parseInt(msg);
     ...
}

The consumer is initialized as-

public SimpleKafkaQueueConsumer(String _topic)
{
    topic = _topic;
    consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
}

private static ConsumerConfig createConsumerConfig()
{
    Properties props = new Properties();
    props.put("zk.connect", "localhost:2181");
    props.put("groupid", "simpleconsumer");
    return new ConsumerConfig(props);
}

We started both the producer and consumer with the same topic but we
noticed that on the consumer side the sequence of the number changes (only
for some of the numbers). We would like the sequence to be exactly the same.

We initially thought that it might be an issue with the data being stored
in different partitions. Is that the case?

Further, we looked into the quick start guide of Kafka to produce data to
the same partition but the constructor mentioned in the point-6 that takes
an extra parameter for the partition key is not available in kafka-0.7. Am
I missing something here?

Is there a way to preserve the sequence?

Regards,
Anuj

Reply via email to