Hi there,

First let me describe the desired result.

I would like to have multiple threads subsribe to a particular topic under
one group_id.
I know that the following example demonstrates how to do this:

// create 4 partitions of the stream for topic “test”, to allow 4 threads
> to consume
> Map<String, List<KafkaMessageStream<Message>>> topicMessageStreams =
> consumerConnector.createMessageStreams(ImmutableMap.of("test", 4));
> List<KafkaMessageStream<Message>> streams =
> topicMessageStreams.get("test");
> // create list of 4 threads to consume from each of the partitions
> ExecutorService executor = Executors.newFixedThreadPool(4);
> // consume the messages in the threads
> for(final KafkaMessageStream<Message> stream: streams) {
> executor.submit(new Runnable() {
> public void run() {
> for(Message message: stream) {
> // process message
> }
> }
> });

BUT I would like to be able to create the connection to the broker from
WITHIN each thread. Is this possible?

For example, each thread does the following.

 creates createJavaConsumerConnector(consumerConfig);
> Map<String, Integer> topic_count = new HashMap<String, Integer>();
>         topic_count.put(topic, new Integer(1));
>         Map<String, List<KafkaMessageStream<Message>>> consumer_map =
> consumerConnector
>                 .createMessageStreams(topic_count);
>
>         KafkaMessageStream<Message> stream =
> consumer_map.get(topic).get(0);
>
>         ConsumerIterator<Message> iter = stream.iterator();
>
>         while(iter.hasNext())
>         {
>             System.out.println(KafkaUtils.getString(iter.next()));
>         }


Rather than partitioning before the threads are started?

Thank you so much
-- 
Juan Wellington Moreno
*Software Engineer*

Reply via email to