After trying and failing to get a more complicated consumer working, I decided to start at square one and use the example code. Below is my barely modified implementation:
import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.logging.Logger; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaMessageStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.Message; import kafka.serializer.DefaultDecoder; public class KafkaTestConsumer { /** * @param args */ public static void main(final String[] args) { final Logger logger = Logger.getLogger("KafkaTestConsumer"); // specify some consumer properties final Properties props = new Properties(); props.put("zk.connect", "testserver:2181"); props.put("zk.connectiontimeout.ms", "1000000"); props.put("groupid", "test_group"); // Create the connection to the cluster final ConsumerConfig consumerConfig = new ConsumerConfig(props); final ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); // create 4 partitions of the stream for topic “testTopic”, to allow 4 // threads to consume final String topicName = "testTopic"; final int numStreams = 4; List<KafkaMessageStream<Message>> streams = null; try { final Map<String, List<KafkaMessageStream<Message>>> topicMessageStreams = consumerConnector .createMessageStreams(Collections.singletonMap(topicName, numStreams), new DefaultDecoder()); streams = topicMessageStreams.get(topicName); } catch (final Exception e) { logger.severe(e.getMessage()); } // create list of 4 threads to consume from each of the partitions final ExecutorService executor = Executors.newFixedThreadPool(numStreams); // consume the messages in the threads for (final KafkaMessageStream<Message> stream : streams) { executor.submit(new Runnable() { @Override public void run() { for (final Message message : stream) { logger.severe("!"); logger.severe(message.toString()); } } }); } } } It runs, I get no errors, and nothing happens. I don't get any messages. I don't THINK it's an issue with my Kafka install for two reasons: 1. Zookeeper logs my client connection. 2. (Granted it's all on localhost but) When I used the console consumer and producer on the instance, they seem to work just fine. Methodology is to start Zookeeper, start Kafka, start above application, and then connect a console produce to generate messages. I'm really at a loss as to what's happening. Interestingly, if I put in breakpoints, I seem to lose a handle as I eventually lose the ability to step over, step into, and so on. I'd really appreciate any input. Cheers.