Juan,

Typically, what you do is to call createMessageStreams only once and pass
each stream to a separate thread for iteration.

Thanks,

Jun


On Thu, May 3, 2012 at 10:10 AM, Juan Moreno
<jwellington.mor...@gmail.com>wrote:

> Hi there,
>
> First let me describe the desired result.
>
> I would like to have multiple threads subsribe to a particular topic under
> one group_id.
> I know that the following example demonstrates how to do this:
>
> // create 4 partitions of the stream for topic “test”, to allow 4 threads
> > to consume
> > Map<String, List<KafkaMessageStream<Message>>> topicMessageStreams =
> > consumerConnector.createMessageStreams(ImmutableMap.of("test", 4));
> > List<KafkaMessageStream<Message>> streams =
> > topicMessageStreams.get("test");
> > // create list of 4 threads to consume from each of the partitions
> > ExecutorService executor = Executors.newFixedThreadPool(4);
> > // consume the messages in the threads
> > for(final KafkaMessageStream<Message> stream: streams) {
> > executor.submit(new Runnable() {
> > public void run() {
> > for(Message message: stream) {
> > // process message
> > }
> > }
> > });
>
> BUT I would like to be able to create the connection to the broker from
> WITHIN each thread. Is this possible?
>
> For example, each thread does the following.
>
>  creates createJavaConsumerConnector(consumerConfig);
> > Map<String, Integer> topic_count = new HashMap<String, Integer>();
> >         topic_count.put(topic, new Integer(1));
> >         Map<String, List<KafkaMessageStream<Message>>> consumer_map =
> > consumerConnector
> >                 .createMessageStreams(topic_count);
> >
> >         KafkaMessageStream<Message> stream =
> > consumer_map.get(topic).get(0);
> >
> >         ConsumerIterator<Message> iter = stream.iterator();
> >
> >         while(iter.hasNext())
> >         {
> >             System.out.println(KafkaUtils.getString(iter.next()));
> >         }
>
>
> Rather than partitioning before the threads are started?
>
> Thank you so much
> --
> Juan Wellington Moreno
> *Software Engineer*
>

Reply via email to