Could you put consumer run() code in try/catch and log all throwables? Executors can eat exceptions.
Thanks, Jun On Tue, May 8, 2012 at 4:08 PM, lessonz <lessonz.leg...@gmail.com> wrote: > After trying and failing to get a more complicated consumer working, I > decided to start at square one and use the example code. Below is my barely > modified implementation: > > import java.util.Collections; > import java.util.List; > import java.util.Map; > import java.util.Properties; > import java.util.concurrent.ExecutorService; > import java.util.concurrent.Executors; > import java.util.logging.Logger; > > import kafka.consumer.Consumer; > import kafka.consumer.ConsumerConfig; > import kafka.consumer.KafkaMessageStream; > import kafka.javaapi.consumer.ConsumerConnector; > import kafka.message.Message; > import kafka.serializer.DefaultDecoder; > > public class KafkaTestConsumer { > > /** > * @param args > */ > public static void main(final String[] args) { > final Logger logger = Logger.getLogger("KafkaTestConsumer"); > > // specify some consumer properties > final Properties props = new Properties(); > props.put("zk.connect", "testserver:2181"); > props.put("zk.connectiontimeout.ms", "1000000"); > props.put("groupid", "test_group"); > > // Create the connection to the cluster > final ConsumerConfig consumerConfig = new ConsumerConfig(props); > final ConsumerConnector consumerConnector = > Consumer.createJavaConsumerConnector(consumerConfig); > > // create 4 partitions of the stream for topic “testTopic”, to > allow 4 > // threads to consume > final String topicName = "testTopic"; > final int numStreams = 4; > List<KafkaMessageStream<Message>> streams = null; > try { > final Map<String, List<KafkaMessageStream<Message>>> > topicMessageStreams = consumerConnector > > .createMessageStreams(Collections.singletonMap(topicName, numStreams), new > DefaultDecoder()); > streams = topicMessageStreams.get(topicName); > } catch (final Exception e) { > logger.severe(e.getMessage()); > } > > // create list of 4 threads to consume from each of the partitions > final ExecutorService executor = > Executors.newFixedThreadPool(numStreams); > > // consume the messages in the threads > for (final KafkaMessageStream<Message> stream : streams) { > executor.submit(new Runnable() { > @Override > public void run() { > for (final Message message : stream) { > logger.severe("!"); > logger.severe(message.toString()); > } > } > }); > } > } > > } > > It runs, I get no errors, and nothing happens. I don't get any messages. I > don't THINK it's an issue with my Kafka install for two reasons: 1. > Zookeeper logs my client connection. 2. (Granted it's all on localhost but) > When I used the console consumer and producer on the instance, they seem to > work just fine. > > Methodology is to start Zookeeper, start Kafka, start above application, > and then connect a console produce to generate messages. I'm really at a > loss as to what's happening. Interestingly, if I put in breakpoints, I seem > to lose a handle as I eventually lose the ability to step over, step into, > and so on. > > I'd really appreciate any input. > > Cheers. >