Okay, here is the amended code: import java.util.Calendar; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.logging.Logger;
import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaMessageStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.Message; import kafka.serializer.DefaultDecoder; public class KafkaTestConsumer { /** * @param args */ public static void main(final String[] args) { try { final Logger logger = Logger.getLogger("KafkaTestConsumer"); // specify some consumer properties final Properties props = new Properties(); props.put("zk.connect", "testserver:2181"); props.put("zk.connectiontimeout.ms", "1000000"); props.put("groupid", "test_group"); // Create the connection to the cluster final ConsumerConfig consumerConfig = new ConsumerConfig(props); final ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); // create 4 partitions of the stream for topic “testTopic”, to allow // 4 // threads to consume final String topicName = "testTopic"; final int numStreams = 4; List<KafkaMessageStream<Message>> streams = null; try { final Map<String, List<KafkaMessageStream<Message>>> topicMessageStreams = consumerConnector .createMessageStreams(Collections.singletonMap(topicName, numStreams), new DefaultDecoder()); streams = topicMessageStreams.get(topicName); } catch (final Exception e) { logger.severe(e.getMessage()); } // create list of 4 threads to consume from each of the partitions final ExecutorService executor = Executors.newFixedThreadPool(numStreams); // consume the messages in the threads for (final KafkaMessageStream<Message> stream : streams) { executor.submit(new Runnable() { @Override public void run() { try { while (true) { logger.severe(Calendar.getInstance().getTime().toString()); if (stream == null) { logger.severe("stream is NULL."); } else { logger.severe("stream = " + stream); for (final Message message : stream) { logger.severe("!"); logger.severe(message.toString()); } } } } catch (final Throwable t) { logger.severe("In run " + t.getMessage()); } finally { if (stream == null) { logger.severe("stream is NULL."); } else { logger.severe("stream = " + stream); } } } }); } } catch (final Throwable t) { System.err.println("In main" + t.getMessage()); } } } Interesting things happen. I get the time printed out only once for each stream. If I use eclipse's debugging and a breakpoint on the line "if (stream == null) {" in the "while (true) {" loop, the variable stream says "<error(s)_during_the_evaluation>" for value. If I step over this line, I'm taken into the else clause, but the logger is never executed, and seems to die when referencing the stream value. So, I got to thinking maybe the problem is a missing dependency somewhere or maybe a conflict. So, here are the dependencies I have in that project's pom (the project has other pieces): <dependencies> <!-- Import the JMS API, we use provided scope as the API is included in JBoss AS 7 --> <dependency> <groupId>org.jboss.spec.javax.jms</groupId> <artifactId>jboss-jms-api_1.1_spec</artifactId> <scope>provided</scope> </dependency> <!-- Import the JCA API, we use provided scope as the API is included in JBoss AS 7 --> <dependency> <groupId>org.jboss.spec.javax.resource</groupId> <artifactId>jboss-connector-api_1.6_spec</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.incubator.kafka</groupId> <artifactId>kafka</artifactId> <version>0.7.0-incubating</version> </dependency> </dependencies> And here is the pom I'm using for kafka: <?xml version="1.0" encoding="UTF-8"?> <project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns=" http://maven.apache.org/POM/4.0.0" xmlns:xsi=" http://www.w3.org/2001/XMLSchema-instance"> <modelVersion>4.0.0</modelVersion> <groupId>org.apache.incubator.kafka</groupId> <artifactId>kafka</artifactId> <packaging>pom</packaging> <version>0.7.0-incubating</version> <name>Apache Kafka</name> <description>Apache Kafka is a distributed publish-subscribe messaging system</description> <url>http://incubator.apache.org/kafka</url> <inceptionYear>2012</inceptionYear> <licenses> <license> <name>The Apache Software License, Version 2.0</name> <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> </license> </licenses> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>com.github.sgroschupf</groupId> <artifactId>zkclient</artifactId> <version>0.1</version> </dependency> </dependencies> <scm> <connection>scm:svn: http://svn.apache.org/repos/asf/incubator/kafka/trunk</connection> <developerConnection>scm:svn: http://svn.apache.org/repos/asf/incubator/kafka/trunk</developerConnection> <url>http://svn.apache.org/repos/asf/incubator/kafka/trunk</url> </scm> </project> Again, any input would be greatly appreciated. Thanks. On Tue, May 8, 2012 at 6:34 PM, Jun Rao <jun...@gmail.com> wrote: > Could you put consumer run() code in try/catch and log all throwables? > Executors can eat exceptions. > > Thanks, > > Jun > > On Tue, May 8, 2012 at 4:08 PM, lessonz <lessonz.leg...@gmail.com> wrote: > > > After trying and failing to get a more complicated consumer working, I > > decided to start at square one and use the example code. Below is my > barely > > modified implementation: > > > > import java.util.Collections; > > import java.util.List; > > import java.util.Map; > > import java.util.Properties; > > import java.util.concurrent.ExecutorService; > > import java.util.concurrent.Executors; > > import java.util.logging.Logger; > > > > import kafka.consumer.Consumer; > > import kafka.consumer.ConsumerConfig; > > import kafka.consumer.KafkaMessageStream; > > import kafka.javaapi.consumer.ConsumerConnector; > > import kafka.message.Message; > > import kafka.serializer.DefaultDecoder; > > > > public class KafkaTestConsumer { > > > > /** > > * @param args > > */ > > public static void main(final String[] args) { > > final Logger logger = Logger.getLogger("KafkaTestConsumer"); > > > > // specify some consumer properties > > final Properties props = new Properties(); > > props.put("zk.connect", "testserver:2181"); > > props.put("zk.connectiontimeout.ms", "1000000"); > > props.put("groupid", "test_group"); > > > > // Create the connection to the cluster > > final ConsumerConfig consumerConfig = new ConsumerConfig(props); > > final ConsumerConnector consumerConnector = > > Consumer.createJavaConsumerConnector(consumerConfig); > > > > // create 4 partitions of the stream for topic “testTopic”, to > > allow 4 > > // threads to consume > > final String topicName = "testTopic"; > > final int numStreams = 4; > > List<KafkaMessageStream<Message>> streams = null; > > try { > > final Map<String, List<KafkaMessageStream<Message>>> > > topicMessageStreams = consumerConnector > > > > .createMessageStreams(Collections.singletonMap(topicName, numStreams), > new > > DefaultDecoder()); > > streams = topicMessageStreams.get(topicName); > > } catch (final Exception e) { > > logger.severe(e.getMessage()); > > } > > > > // create list of 4 threads to consume from each of the partitions > > final ExecutorService executor = > > Executors.newFixedThreadPool(numStreams); > > > > // consume the messages in the threads > > for (final KafkaMessageStream<Message> stream : streams) { > > executor.submit(new Runnable() { > > @Override > > public void run() { > > for (final Message message : stream) { > > logger.severe("!"); > > logger.severe(message.toString()); > > } > > } > > }); > > } > > } > > > > } > > > > It runs, I get no errors, and nothing happens. I don't get any messages. > I > > don't THINK it's an issue with my Kafka install for two reasons: 1. > > Zookeeper logs my client connection. 2. (Granted it's all on localhost > but) > > When I used the console consumer and producer on the instance, they seem > to > > work just fine. > > > > Methodology is to start Zookeeper, start Kafka, start above application, > > and then connect a console produce to generate messages. I'm really at a > > loss as to what's happening. Interestingly, if I put in breakpoints, I > seem > > to lose a handle as I eventually lose the ability to step over, step > into, > > and so on. > > > > I'd really appreciate any input. > > > > Cheers. > > >