Okay, here is the amended code:

import java.util.Calendar;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Logger;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaMessageStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.serializer.DefaultDecoder;

public class KafkaTestConsumer {

    /**
     * @param args
     */
    public static void main(final String[] args) {
        try {
            final Logger logger = Logger.getLogger("KafkaTestConsumer");

            // specify some consumer properties
            final Properties props = new Properties();
            props.put("zk.connect", "testserver:2181");
            props.put("zk.connectiontimeout.ms", "1000000");
            props.put("groupid", "test_group");

            // Create the connection to the cluster
            final ConsumerConfig consumerConfig = new ConsumerConfig(props);
            final ConsumerConnector consumerConnector =
Consumer.createJavaConsumerConnector(consumerConfig);

            // create 4 partitions of the stream for topic “testTopic”, to
allow
            // 4
            // threads to consume
            final String topicName = "testTopic";
            final int numStreams = 4;
            List<KafkaMessageStream<Message>> streams = null;
            try {
                final Map<String, List<KafkaMessageStream<Message>>>
topicMessageStreams = consumerConnector

.createMessageStreams(Collections.singletonMap(topicName, numStreams), new
DefaultDecoder());
                streams = topicMessageStreams.get(topicName);
            } catch (final Exception e) {
                logger.severe(e.getMessage());
            }

            // create list of 4 threads to consume from each of the
partitions
            final ExecutorService executor =
Executors.newFixedThreadPool(numStreams);

            // consume the messages in the threads
            for (final KafkaMessageStream<Message> stream : streams) {
                executor.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            while (true) {

logger.severe(Calendar.getInstance().getTime().toString());
                                if (stream == null) {
                                    logger.severe("stream is NULL.");
                                } else {
                                    logger.severe("stream = " + stream);
                                    for (final Message message : stream) {
                                        logger.severe("!");
                                        logger.severe(message.toString());
                                    }
                                }
                            }
                        } catch (final Throwable t) {
                            logger.severe("In run " + t.getMessage());
                        } finally {
                            if (stream == null) {
                                logger.severe("stream is NULL.");
                            } else {
                                logger.severe("stream = " + stream);
                            }
                        }
                    }
                });
            }
        } catch (final Throwable t) {
            System.err.println("In main" + t.getMessage());
        }
    }

}

Interesting things happen. I get the time printed out only once for each
stream. If I use eclipse's debugging and a breakpoint on the line "if
(stream == null) {" in the "while (true) {" loop, the variable stream says
"<error(s)_during_the_evaluation>" for value. If I step over this line, I'm
taken into the else clause, but the logger is never executed, and seems to
die when referencing the stream value. So, I got to thinking maybe the
problem is a missing dependency somewhere or maybe a conflict. So, here are
the dependencies I have in that project's pom (the project has other
pieces):

    <dependencies>
        <!-- Import the JMS API, we use provided scope as the API is
included in
            JBoss AS 7 -->
        <dependency>
            <groupId>org.jboss.spec.javax.jms</groupId>
            <artifactId>jboss-jms-api_1.1_spec</artifactId>
            <scope>provided</scope>
        </dependency>
        <!-- Import the JCA API, we use provided scope as the API is
included in
            JBoss AS 7 -->
        <dependency>
            <groupId>org.jboss.spec.javax.resource</groupId>
            <artifactId>jboss-connector-api_1.6_spec</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.incubator.kafka</groupId>
            <artifactId>kafka</artifactId>
            <version>0.7.0-incubating</version>
        </dependency>
    </dependencies>

And here is the pom I'm using for kafka:

<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd"; xmlns="
http://maven.apache.org/POM/4.0.0"; xmlns:xsi="
http://www.w3.org/2001/XMLSchema-instance";>
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.apache.incubator.kafka</groupId>
    <artifactId>kafka</artifactId>
    <packaging>pom</packaging>
    <version>0.7.0-incubating</version>
    <name>Apache Kafka</name>
    <description>Apache Kafka is a distributed publish-subscribe messaging
system</description>
    <url>http://incubator.apache.org/kafka</url>
    <inceptionYear>2012</inceptionYear>
    <licenses>
        <license>
            <name>The Apache Software License, Version 2.0</name>
            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
        </license>
    </licenses>
    <dependencies>
       <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.8.0</version>
       </dependency>
       <dependency>
            <groupId>com.github.sgroschupf</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.1</version>
       </dependency>
    </dependencies>
    <scm>
        <connection>scm:svn:
http://svn.apache.org/repos/asf/incubator/kafka/trunk</connection>
        <developerConnection>scm:svn:
http://svn.apache.org/repos/asf/incubator/kafka/trunk</developerConnection>
        <url>http://svn.apache.org/repos/asf/incubator/kafka/trunk</url>
    </scm>
</project>

Again, any input would be greatly appreciated. Thanks.

On Tue, May 8, 2012 at 6:34 PM, Jun Rao <jun...@gmail.com> wrote:

> Could you put consumer run() code in try/catch and log all throwables?
> Executors can eat exceptions.
>
> Thanks,
>
> Jun
>
> On Tue, May 8, 2012 at 4:08 PM, lessonz <lessonz.leg...@gmail.com> wrote:
>
> > After trying and failing to get a more complicated consumer working, I
> > decided to start at square one and use the example code. Below is my
> barely
> > modified implementation:
> >
> > import java.util.Collections;
> > import java.util.List;
> > import java.util.Map;
> > import java.util.Properties;
> > import java.util.concurrent.ExecutorService;
> > import java.util.concurrent.Executors;
> > import java.util.logging.Logger;
> >
> > import kafka.consumer.Consumer;
> > import kafka.consumer.ConsumerConfig;
> > import kafka.consumer.KafkaMessageStream;
> > import kafka.javaapi.consumer.ConsumerConnector;
> > import kafka.message.Message;
> > import kafka.serializer.DefaultDecoder;
> >
> > public class KafkaTestConsumer {
> >
> >    /**
> >     * @param args
> >     */
> >    public static void main(final String[] args) {
> >        final Logger logger = Logger.getLogger("KafkaTestConsumer");
> >
> >        // specify some consumer properties
> >        final Properties props = new Properties();
> >        props.put("zk.connect", "testserver:2181");
> >        props.put("zk.connectiontimeout.ms", "1000000");
> >        props.put("groupid", "test_group");
> >
> >        // Create the connection to the cluster
> >        final ConsumerConfig consumerConfig = new ConsumerConfig(props);
> >        final ConsumerConnector consumerConnector =
> > Consumer.createJavaConsumerConnector(consumerConfig);
> >
> >        // create 4 partitions of the stream for topic “testTopic”, to
> > allow 4
> >        // threads to consume
> >        final String topicName = "testTopic";
> >        final int numStreams = 4;
> >        List<KafkaMessageStream<Message>> streams = null;
> >        try {
> >            final Map<String, List<KafkaMessageStream<Message>>>
> > topicMessageStreams = consumerConnector
> >
> > .createMessageStreams(Collections.singletonMap(topicName, numStreams),
> new
> > DefaultDecoder());
> >            streams = topicMessageStreams.get(topicName);
> >        } catch (final Exception e) {
> >            logger.severe(e.getMessage());
> >        }
> >
> >        // create list of 4 threads to consume from each of the partitions
> >        final ExecutorService executor =
> > Executors.newFixedThreadPool(numStreams);
> >
> >        // consume the messages in the threads
> >        for (final KafkaMessageStream<Message> stream : streams) {
> >            executor.submit(new Runnable() {
> >                @Override
> >                public void run() {
> >                    for (final Message message : stream) {
> >                        logger.severe("!");
> >                        logger.severe(message.toString());
> >                    }
> >                }
> >            });
> >        }
> >    }
> >
> > }
> >
> > It runs, I get no errors, and nothing happens. I don't get any messages.
> I
> > don't THINK it's an issue with my Kafka install for two reasons: 1.
> > Zookeeper logs my client connection. 2. (Granted it's all on localhost
> but)
> > When I used the console consumer and producer on the instance, they seem
> to
> > work just fine.
> >
> > Methodology is to start Zookeeper, start Kafka, start above application,
> > and then connect a console produce to generate messages. I'm really at a
> > loss as to what's happening. Interestingly, if I put in breakpoints, I
> seem
> > to lose a handle as I eventually lose the ability to step over, step
> into,
> > and so on.
> >
> > I'd really appreciate any input.
> >
> > Cheers.
> >
>

Reply via email to