Hi there, First let me describe the desired result.
I would like to have multiple threads subsribe to a particular topic under one group_id. I know that the following example demonstrates how to do this: // create 4 partitions of the stream for topic “test”, to allow 4 threads > to consume > Map<String, List<KafkaMessageStream<Message>>> topicMessageStreams = > consumerConnector.createMessageStreams(ImmutableMap.of("test", 4)); > List<KafkaMessageStream<Message>> streams = > topicMessageStreams.get("test"); > // create list of 4 threads to consume from each of the partitions > ExecutorService executor = Executors.newFixedThreadPool(4); > // consume the messages in the threads > for(final KafkaMessageStream<Message> stream: streams) { > executor.submit(new Runnable() { > public void run() { > for(Message message: stream) { > // process message > } > } > }); BUT I would like to be able to create the connection to the broker from WITHIN each thread. Is this possible? For example, each thread does the following. creates createJavaConsumerConnector(consumerConfig); > Map<String, Integer> topic_count = new HashMap<String, Integer>(); > topic_count.put(topic, new Integer(1)); > Map<String, List<KafkaMessageStream<Message>>> consumer_map = > consumerConnector > .createMessageStreams(topic_count); > > KafkaMessageStream<Message> stream = > consumer_map.get(topic).get(0); > > ConsumerIterator<Message> iter = stream.iterator(); > > while(iter.hasNext()) > { > System.out.println(KafkaUtils.getString(iter.next())); > } Rather than partitioning before the threads are started? Thank you so much -- Juan Wellington Moreno *Software Engineer*