I would try the test with numStreams=1 and see what happens. Also, you may
want to produce some new data after the test starts.

If you still have problem, could you get the value of the following paths
from ZK (bin/zookeeper-shell.sh) after you started your consumer test?

/consumers/[group_id]/ids/[consumer_id]

/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]

Jun

On Wed, May 9, 2012 at 8:49 AM, lessonz <lessonz.leg...@gmail.com> wrote:

> Okay, here is the amended code:
>
> import java.util.Calendar;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors;
> import java.util.logging.Logger;
>
> import kafka.consumer.Consumer;
> import kafka.consumer.ConsumerConfig;
> import kafka.consumer.KafkaMessageStream;
> import kafka.javaapi.consumer.ConsumerConnector;
> import kafka.message.Message;
> import kafka.serializer.DefaultDecoder;
>
> public class KafkaTestConsumer {
>
>    /**
>     * @param args
>     */
>    public static void main(final String[] args) {
>         try {
>             final Logger logger = Logger.getLogger("KafkaTestConsumer");
>
>            // specify some consumer properties
>            final Properties props = new Properties();
>            props.put("zk.connect", "testserver:2181");
>            props.put("zk.connectiontimeout.ms", "1000000");
>            props.put("groupid", "test_group");
>
>            // Create the connection to the cluster
>            final ConsumerConfig consumerConfig = new ConsumerConfig(props);
>            final ConsumerConnector consumerConnector =
> Consumer.createJavaConsumerConnector(consumerConfig);
>
>            // create 4 partitions of the stream for topic “testTopic”, to
> allow
>             // 4
>             // threads to consume
>            final String topicName = "testTopic";
>            final int numStreams = 4;
>            List<KafkaMessageStream<Message>> streams = null;
>            try {
>                final Map<String, List<KafkaMessageStream<Message>>>
> topicMessageStreams = consumerConnector
>
> .createMessageStreams(Collections.singletonMap(topicName, numStreams), new
> DefaultDecoder());
>                streams = topicMessageStreams.get(topicName);
>            } catch (final Exception e) {
>                logger.severe(e.getMessage());
>            }
>
>            // create list of 4 threads to consume from each of the
> partitions
>            final ExecutorService executor =
> Executors.newFixedThreadPool(numStreams);
>
>            // consume the messages in the threads
>            for (final KafkaMessageStream<Message> stream : streams) {
>                executor.submit(new Runnable() {
>                    @Override
>                    public void run() {
>                         try {
>                            while (true) {
>
> logger.severe(Calendar.getInstance().getTime().toString());
>                                if (stream == null) {
>                                    logger.severe("stream is NULL.");
>                                } else {
>                                    logger.severe("stream = " + stream);
>                                     for (final Message message : stream) {
>                                        logger.severe("!");
>                                        logger.severe(message.toString());
>                                    }
>                                }
>                            }
>                         } catch (final Throwable t) {
>                            logger.severe("In run " + t.getMessage());
>                        } finally {
>                            if (stream == null) {
>                                logger.severe("stream is NULL.");
>                            } else {
>                                logger.severe("stream = " + stream);
>                            }
>                        }
>                    }
>                });
>            }
>        } catch (final Throwable t) {
>            System.err.println("In main" + t.getMessage());
>        }
>    }
>
> }
>
> Interesting things happen. I get the time printed out only once for each
> stream. If I use eclipse's debugging and a breakpoint on the line "if
> (stream == null) {" in the "while (true) {" loop, the variable stream says
> "<error(s)_during_the_evaluation>" for value. If I step over this line, I'm
> taken into the else clause, but the logger is never executed, and seems to
> die when referencing the stream value. So, I got to thinking maybe the
> problem is a missing dependency somewhere or maybe a conflict. So, here are
> the dependencies I have in that project's pom (the project has other
> pieces):
>
>    <dependencies>
>        <!-- Import the JMS API, we use provided scope as the API is
> included in
>            JBoss AS 7 -->
>        <dependency>
>            <groupId>org.jboss.spec.javax.jms</groupId>
>            <artifactId>jboss-jms-api_1.1_spec</artifactId>
>            <scope>provided</scope>
>        </dependency>
>        <!-- Import the JCA API, we use provided scope as the API is
> included in
>            JBoss AS 7 -->
>        <dependency>
>            <groupId>org.jboss.spec.javax.resource</groupId>
>            <artifactId>jboss-connector-api_1.6_spec</artifactId>
>            <scope>provided</scope>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.incubator.kafka</groupId>
>            <artifactId>kafka</artifactId>
>            <version>0.7.0-incubating</version>
>        </dependency>
>    </dependencies>
>
> And here is the pom I'm using for kafka:
>
> <?xml version="1.0" encoding="UTF-8"?>
> <project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> http://maven.apache.org/xsd/maven-4.0.0.xsd"; xmlns="
> http://maven.apache.org/POM/4.0.0"; xmlns:xsi="
> http://www.w3.org/2001/XMLSchema-instance";>
>    <modelVersion>4.0.0</modelVersion>
>    <groupId>org.apache.incubator.kafka</groupId>
>    <artifactId>kafka</artifactId>
>    <packaging>pom</packaging>
>    <version>0.7.0-incubating</version>
>    <name>Apache Kafka</name>
>    <description>Apache Kafka is a distributed publish-subscribe messaging
> system</description>
>    <url>http://incubator.apache.org/kafka</url>
>    <inceptionYear>2012</inceptionYear>
>    <licenses>
>        <license>
>            <name>The Apache Software License, Version 2.0</name>
>            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
>        </license>
>    </licenses>
>    <dependencies>
>       <dependency>
>            <groupId>org.scala-lang</groupId>
>            <artifactId>scala-library</artifactId>
>            <version>2.8.0</version>
>       </dependency>
>       <dependency>
>            <groupId>com.github.sgroschupf</groupId>
>            <artifactId>zkclient</artifactId>
>            <version>0.1</version>
>       </dependency>
>    </dependencies>
>    <scm>
>        <connection>scm:svn:
> http://svn.apache.org/repos/asf/incubator/kafka/trunk</connection>
>        <developerConnection>scm:svn:
> http://svn.apache.org/repos/asf/incubator/kafka/trunk
> </developerConnection>
>        <url>http://svn.apache.org/repos/asf/incubator/kafka/trunk</url>
>    </scm>
> </project>
>
> Again, any input would be greatly appreciated. Thanks.
>
> On Tue, May 8, 2012 at 6:34 PM, Jun Rao <jun...@gmail.com> wrote:
>
> > Could you put consumer run() code in try/catch and log all throwables?
> > Executors can eat exceptions.
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, May 8, 2012 at 4:08 PM, lessonz <lessonz.leg...@gmail.com>
> wrote:
> >
> > > After trying and failing to get a more complicated consumer working, I
> > > decided to start at square one and use the example code. Below is my
> > barely
> > > modified implementation:
> > >
> > > import java.util.Collections;
> > > import java.util.List;
> > > import java.util.Map;
> > > import java.util.Properties;
> > > import java.util.concurrent.ExecutorService;
> > > import java.util.concurrent.Executors;
> > > import java.util.logging.Logger;
> > >
> > > import kafka.consumer.Consumer;
> > > import kafka.consumer.ConsumerConfig;
> > > import kafka.consumer.KafkaMessageStream;
> > > import kafka.javaapi.consumer.ConsumerConnector;
> > > import kafka.message.Message;
> > > import kafka.serializer.DefaultDecoder;
> > >
> > > public class KafkaTestConsumer {
> > >
> > >    /**
> > >     * @param args
> > >     */
> > >    public static void main(final String[] args) {
> > >        final Logger logger = Logger.getLogger("KafkaTestConsumer");
> > >
> > >        // specify some consumer properties
> > >        final Properties props = new Properties();
> > >        props.put("zk.connect", "testserver:2181");
> > >        props.put("zk.connectiontimeout.ms", "1000000");
> > >        props.put("groupid", "test_group");
> > >
> > >        // Create the connection to the cluster
> > >        final ConsumerConfig consumerConfig = new ConsumerConfig(props);
> > >        final ConsumerConnector consumerConnector =
> > > Consumer.createJavaConsumerConnector(consumerConfig);
> > >
> > >        // create 4 partitions of the stream for topic “testTopic”, to
> > > allow 4
> > >        // threads to consume
> > >        final String topicName = "testTopic";
> > >        final int numStreams = 4;
> > >        List<KafkaMessageStream<Message>> streams = null;
> > >        try {
> > >            final Map<String, List<KafkaMessageStream<Message>>>
> > > topicMessageStreams = consumerConnector
> > >
> > > .createMessageStreams(Collections.singletonMap(topicName, numStreams),
> > new
> > > DefaultDecoder());
> > >            streams = topicMessageStreams.get(topicName);
> > >        } catch (final Exception e) {
> > >            logger.severe(e.getMessage());
> > >        }
> > >
> > >        // create list of 4 threads to consume from each of the
> partitions
> > >        final ExecutorService executor =
> > > Executors.newFixedThreadPool(numStreams);
> > >
> > >        // consume the messages in the threads
> > >        for (final KafkaMessageStream<Message> stream : streams) {
> > >            executor.submit(new Runnable() {
> > >                @Override
> > >                public void run() {
> > >                    for (final Message message : stream) {
> > >                        logger.severe("!");
> > >                        logger.severe(message.toString());
> > >                    }
> > >                }
> > >            });
> > >        }
> > >    }
> > >
> > > }
> > >
> > > It runs, I get no errors, and nothing happens. I don't get any
> messages.
> > I
> > > don't THINK it's an issue with my Kafka install for two reasons: 1.
> > > Zookeeper logs my client connection. 2. (Granted it's all on localhost
> > but)
> > > When I used the console consumer and producer on the instance, they
> seem
> > to
> > > work just fine.
> > >
> > > Methodology is to start Zookeeper, start Kafka, start above
> application,
> > > and then connect a console produce to generate messages. I'm really at
> a
> > > loss as to what's happening. Interestingly, if I put in breakpoints, I
> > seem
> > > to lose a handle as I eventually lose the ability to step over, step
> > into,
> > > and so on.
> > >
> > > I'd really appreciate any input.
> > >
> > > Cheers.
> > >
> >
>

Reply via email to