Could you put consumer run() code in try/catch and log all throwables?
Executors can eat exceptions.

Thanks,

Jun

On Tue, May 8, 2012 at 4:08 PM, lessonz <lessonz.leg...@gmail.com> wrote:

> After trying and failing to get a more complicated consumer working, I
> decided to start at square one and use the example code. Below is my barely
> modified implementation:
>
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors;
> import java.util.logging.Logger;
>
> import kafka.consumer.Consumer;
> import kafka.consumer.ConsumerConfig;
> import kafka.consumer.KafkaMessageStream;
> import kafka.javaapi.consumer.ConsumerConnector;
> import kafka.message.Message;
> import kafka.serializer.DefaultDecoder;
>
> public class KafkaTestConsumer {
>
>    /**
>     * @param args
>     */
>    public static void main(final String[] args) {
>        final Logger logger = Logger.getLogger("KafkaTestConsumer");
>
>        // specify some consumer properties
>        final Properties props = new Properties();
>        props.put("zk.connect", "testserver:2181");
>        props.put("zk.connectiontimeout.ms", "1000000");
>        props.put("groupid", "test_group");
>
>        // Create the connection to the cluster
>        final ConsumerConfig consumerConfig = new ConsumerConfig(props);
>        final ConsumerConnector consumerConnector =
> Consumer.createJavaConsumerConnector(consumerConfig);
>
>        // create 4 partitions of the stream for topic “testTopic”, to
> allow 4
>        // threads to consume
>        final String topicName = "testTopic";
>        final int numStreams = 4;
>        List<KafkaMessageStream<Message>> streams = null;
>        try {
>            final Map<String, List<KafkaMessageStream<Message>>>
> topicMessageStreams = consumerConnector
>
> .createMessageStreams(Collections.singletonMap(topicName, numStreams), new
> DefaultDecoder());
>            streams = topicMessageStreams.get(topicName);
>        } catch (final Exception e) {
>            logger.severe(e.getMessage());
>        }
>
>        // create list of 4 threads to consume from each of the partitions
>        final ExecutorService executor =
> Executors.newFixedThreadPool(numStreams);
>
>        // consume the messages in the threads
>        for (final KafkaMessageStream<Message> stream : streams) {
>            executor.submit(new Runnable() {
>                @Override
>                public void run() {
>                    for (final Message message : stream) {
>                        logger.severe("!");
>                        logger.severe(message.toString());
>                    }
>                }
>            });
>        }
>    }
>
> }
>
> It runs, I get no errors, and nothing happens. I don't get any messages. I
> don't THINK it's an issue with my Kafka install for two reasons: 1.
> Zookeeper logs my client connection. 2. (Granted it's all on localhost but)
> When I used the console consumer and producer on the instance, they seem to
> work just fine.
>
> Methodology is to start Zookeeper, start Kafka, start above application,
> and then connect a console produce to generate messages. I'm really at a
> loss as to what's happening. Interestingly, if I put in breakpoints, I seem
> to lose a handle as I eventually lose the ability to step over, step into,
> and so on.
>
> I'd really appreciate any input.
>
> Cheers.
>

Reply via email to