After trying and failing to get a more complicated consumer working, I
decided to start at square one and use the example code. Below is my barely
modified implementation:

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.
ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Logger;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaMessageStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.serializer.DefaultDecoder;

public class KafkaTestConsumer {

    /**
     * @param args
     */
    public static void main(final String[] args) {
        final Logger logger = Logger.getLogger("KafkaTestConsumer");

        // specify some consumer properties
        final Properties props = new Properties();
        props.put("zk.connect", "testserver:2181");
        props.put("zk.connectiontimeout.ms", "1000000");
        props.put("groupid", "test_group");

        // Create the connection to the cluster
        final ConsumerConfig consumerConfig = new ConsumerConfig(props);
        final ConsumerConnector consumerConnector =
Consumer.createJavaConsumerConnector(consumerConfig);

        // create 4 partitions of the stream for topic “testTopic”, to
allow 4
        // threads to consume
        final String topicName = "testTopic";
        final int numStreams = 4;
        List<KafkaMessageStream<Message>> streams = null;
        try {
            final Map<String, List<KafkaMessageStream<Message>>>
topicMessageStreams = consumerConnector

.createMessageStreams(Collections.singletonMap(topicName, numStreams), new
DefaultDecoder());
            streams = topicMessageStreams.get(topicName);
        } catch (final Exception e) {
            logger.severe(e.getMessage());
        }

        // create list of 4 threads to consume from each of the partitions
        final ExecutorService executor =
Executors.newFixedThreadPool(numStreams);

        // consume the messages in the threads
        for (final KafkaMessageStream<Message> stream : streams) {
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    for (final Message message : stream) {
                        logger.severe("!");
                        logger.severe(message.toString());
                    }
                }
            });
        }
    }

}

It runs, I get no errors, and nothing happens. I don't get any messages. I
don't THINK it's an issue with my Kafka install for two reasons: 1.
Zookeeper logs my client connection. 2. (Granted it's all on localhost but)
When I used the console consumer and producer on the instance, they seem to
work just fine.

Methodology is to start Zookeeper, start Kafka, start above application,
and then connect a console produce to generate messages. I'm really at a
loss as to what's happening. Interestingly, if I put in breakpoints, I seem
to lose a handle as I eventually lose the ability to step over, step into,
and so on.

I'd really appreciate any input.

Cheers.

Reply via email to