Juan, Typically, what you do is to call createMessageStreams only once and pass each stream to a separate thread for iteration.
Thanks, Jun On Thu, May 3, 2012 at 10:10 AM, Juan Moreno <jwellington.mor...@gmail.com>wrote: > Hi there, > > First let me describe the desired result. > > I would like to have multiple threads subsribe to a particular topic under > one group_id. > I know that the following example demonstrates how to do this: > > // create 4 partitions of the stream for topic “test”, to allow 4 threads > > to consume > > Map<String, List<KafkaMessageStream<Message>>> topicMessageStreams = > > consumerConnector.createMessageStreams(ImmutableMap.of("test", 4)); > > List<KafkaMessageStream<Message>> streams = > > topicMessageStreams.get("test"); > > // create list of 4 threads to consume from each of the partitions > > ExecutorService executor = Executors.newFixedThreadPool(4); > > // consume the messages in the threads > > for(final KafkaMessageStream<Message> stream: streams) { > > executor.submit(new Runnable() { > > public void run() { > > for(Message message: stream) { > > // process message > > } > > } > > }); > > BUT I would like to be able to create the connection to the broker from > WITHIN each thread. Is this possible? > > For example, each thread does the following. > > creates createJavaConsumerConnector(consumerConfig); > > Map<String, Integer> topic_count = new HashMap<String, Integer>(); > > topic_count.put(topic, new Integer(1)); > > Map<String, List<KafkaMessageStream<Message>>> consumer_map = > > consumerConnector > > .createMessageStreams(topic_count); > > > > KafkaMessageStream<Message> stream = > > consumer_map.get(topic).get(0); > > > > ConsumerIterator<Message> iter = stream.iterator(); > > > > while(iter.hasNext()) > > { > > System.out.println(KafkaUtils.getString(iter.next())); > > } > > > Rather than partitioning before the threads are started? > > Thank you so much > -- > Juan Wellington Moreno > *Software Engineer* >