I realize message.toString() is not the way to access the payload, but
unless Scala changes this, in Java, if nothing else, toString should use
Object's implementation which should at least print an object reference. In
any event, I modified the for-loop to:

                        for (final Message message : stream) {
                            logger.severe("!");
                            final ByteBuffer buffer = message.payload();
                            final byte[] bytes = new
byte[buffer.remaining()];
                            buffer.get(bytes);
                            logger.severe(new String(bytes));
                        }

And there was no change in behavior. I'm now trying to familiarize myself
with sbt to try and figure out if I'm missing a dependency. Thanks.

On Fri, May 11, 2012 at 10:39 AM, Jun Rao <jun...@gmail.com> wrote:

> Also, message.toString() is not the right way to convert the message to
> String. You need to take the payload from the message and then convert the
> bytes to string. Take a look at
> examples/src/main/java/kafka/examples/ExampleUtils.java
>
> Jun
>
> On Fri, May 11, 2012 at 9:30 AM, lessonz <lessonz.leg...@gmail.com> wrote:
>
> > Upon commenting out the logging output (the first and the identical one
> in
> > the finally clause), I'm still not getting anything from the server. I'm
> > assuming the sample client code works for others. That's why I'm worried
> my
> > problem might be more environmental, like a missing dependency. Has
> anyone
> > else tested the sample code?
> >
> > On Fri, May 11, 2012 at 10:13 AM, Jun Rao <jun...@gmail.com> wrote:
> >
> > > Try not to do logger.severe("stream = " + stream). I am not sure if a
> > > stream is printable.
> > >
> > > Jun
> > >
> > > On Fri, May 11, 2012 at 8:27 AM, lessonz <lessonz.leg...@gmail.com>
> > wrote:
> > >
> > > > Okay, here's the newly amended code:
> > > >
> > > > import java.util.Calendar;
> > > > import java.util.Collections;
> > > > import java.util.List;
> > > > import java.util.Map;
> > > > import java.util.Properties;
> > > > import java.util.logging.Logger;
> > > >
> > > > import kafka.consumer.Consumer;
> > > > import kafka.consumer.ConsumerConfig;
> > > > import kafka.consumer.KafkaMessageStream;
> > > > import kafka.javaapi.consumer.ConsumerConnector;
> > > > import kafka.message.Message;
> > > > import kafka.serializer.DefaultDecoder;
> > > >
> > > > public class KafkaTestConsumer {
> > > >
> > > >    /**
> > > >     * @param args
> > > >     */
> > > >    public static void main(final String[] args) {
> > > >        final Logger logger = Logger.getLogger("KafkaTestConsumer");
> > > >         try {
> > > >
> > > >            // specify some consumer properties
> > > >            final Properties props = new Properties();
> > > >             props.put("zk.connect", "testServer:2181");
> > > >             props.put("zk.connectiontimeout.ms", "1000000");
> > > >            props.put("groupid", "test_group");
> > > >
> > > >            // Create the connection to the cluster
> > > >            final ConsumerConfig consumerConfig = new
> > > ConsumerConfig(props);
> > > >            final ConsumerConnector consumerConnector =
> > > > Consumer.createJavaConsumerConnector(consumerConfig);
> > > >
> > > >            // create 4 partitions of the stream for topic
> “testTopic”,
> > to
> > > > allow
> > > >            // 4
> > > >            // threads to consume
> > > >            final String topicName = "testTopic";
> > > >            final int numStreams = 1;
> > > >            List<KafkaMessageStream<Message>> streams = null;
> > > >            try {
> > > >                final Map<String, List<KafkaMessageStream<Message>>>
> > > > topicMessageStreams = consumerConnector
> > > >
> > > > .createMessageStreams(Collections.singletonMap(topicName,
> numStreams),
> > > new
> > > > DefaultDecoder());
> > > >                streams = topicMessageStreams.get(topicName);
> > > >            } catch (final Exception e) {
> > > >                logger.severe(e.getMessage());
> > > >            }
> > > >
> > > >             final KafkaMessageStream<Message> stream =
> streams.get(0);
> > > >
> > > >            final Thread thread = new Thread(new Runnable() {
> > > >                 @Override
> > > >                public void run() {
> > > >                    try {
> > > >                        while (true) {
> > > >
> > > > logger.severe(Calendar.getInstance().getTime().toString());
> > > >                            if (stream == null) {
> > > >                                logger.severe("stream is NULL.");
> > > >                            } else {
> > > >                                logger.severe("stream = " + stream);
> > > >                                for (final Message message : stream) {
> > > >                                    logger.severe("!");
> > > >                                    logger.severe(message.toString());
> > > >                                }
> > > >                            }
> > > >                        }
> > > >                    } catch (final Throwable t) {
> > > >                        logger.severe("In run " + t.getMessage());
> > > >                    } finally {
> > > >                        if (stream == null) {
> > > >                            logger.severe("stream is NULL.");
> > > >                        } else {
> > > >                            logger.severe("stream = " + stream);
> > > >                        }
> > > >                    }
> > > >                }
> > > >            });
> > > >
> > > >             thread.start();
> > > >        } catch (final Throwable t) {
> > > >            logger.severe("In main" + t.getMessage());
> > > >        }
> > > >    }
> > > > }
> > > >
> > > > Behavior is identical to using the executor. I get the logged time,
> but
> > > > nothing else. Using debugging stream is not null, but as soon as I
> try
> > to
> > > > reference it in the line "logger.severe("stream = " + stream);" it
> > dies.
> > > I
> > > > don't get that output. So, it seems in both cases, my call to
> > > > consumerConnector.createMessageStreams(...) is returning a Map of the
> > > right
> > > > size, but its contents appear to be corrupt.
> > > >
> > > > I have the source, but I really know nothing about Scala. In looking
> at
> > > >
> kafka.javaapi.consumer.ZookeeperConsumerConnector#createMessageStreams,
> > > it
> > > > all seems pretty straightforward. I have to wonder again about
> whether
> > > I'm
> > > > somehow throwing a silent class definition not found exception. For
> > > > dependencies I only have scala-library 2.8.0 and zkclient 0.1. Is
> that
> > > > really all that's needed? Are they the right versions?
> > > >
> > > > Again, thanks for your help.
> > > >
> > > >
> > > > On Thu, May 10, 2012 at 6:01 PM, Jun Rao <jun...@gmail.com> wrote:
> > > >
> > > > > Another thing that I would suggest is to not use Executors and
> start
> > > your
> > > > > own thread for consumption.
> > > > >
> > > > > Jun
> > > > >
> > > > > On Thu, May 10, 2012 at 12:14 PM, lessonz <
> lessonz.leg...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Yes, get /consumers/test_group/offsets/
> > > > > > testTopic/0-0 shows values changing. Neither
> > > > /consumers/[group_id]/ids/[
> > > > > > consumer_id] nor /consumers/test_group/owners change any values.
> > > > > Hopefully
> > > > > > that tells us something? Again, thanks for all of your help.
> > > > > >
> > > > > >
> > > > > > On Thu, May 10, 2012 at 10:28 AM, Jun Rao <jun...@gmail.com>
> > wrote:
> > > > > >
> > > > > > > So you are saying that the value in
> > > > > > > /consumers/test_group/offsets/testTopic/0-0
> > > > > > > is moving? That typically means that the consumer is consuming.
> > > What
> > > > > > about
> > > > > > > the values of /consumers/test_group//ids and
> > > > > /consumers/test_group/owner?
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > On Thu, May 10, 2012 at 8:53 AM, lessonz <
> > lessonz.leg...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Well, that makes more sense. Sorry about that.
> > > > > > > >
> > > > > > > > Okay, so here are the results (I subbed in the [client
> machine
> > > > name]
> > > > > > for
> > > > > > > > the actual value):
> > > > > > > >
> > > > > > > > ls /consumers/test_group/ids
> > > > > > > > [test_group_[client machine name]-1336664541413-4179e37b]
> > > > > > > > get /consumers/test_group/ids/test_group_[client machine
> > > > > > > > name]-1336664541413-4179e37b
> > > > > > > > { "testTopic": 1 }
> > > > > > > > cZxid = 0xd6
> > > > > > > > ctime = Thu May 10 09:39:39 MDT 2012
> > > > > > > > mZxid = 0xd6
> > > > > > > > mtime = Thu May 10 09:39:39 MDT 2012
> > > > > > > > pZxid = 0xd6
> > > > > > > > cversion = 0
> > > > > > > > dataVersion = 0
> > > > > > > > aclVersion = 0
> > > > > > > > ephemeralOwner = 0x137376733c00002
> > > > > > > > dataLength = 18
> > > > > > > > numChildren = 0
> > > > > > > >
> > > > > > > > and
> > > > > > > >
> > > > > > > > ls /consumers/test_group/offsets/testTopic
> > > > > > > > [0-0]
> > > > > > > > get /consumers/test_group/offsets/testTopic/0-0
> > > > > > > > -1
> > > > > > > > cZxid = 0x1d
> > > > > > > > ctime = Wed May 09 13:29:24 MDT 2012
> > > > > > > > mZxid = 0x106
> > > > > > > > mtime = Thu May 10 09:47:18 MDT 2012
> > > > > > > > pZxid = 0x1d
> > > > > > > > cversion = 0
> > > > > > > > dataVersion = 211
> > > > > > > > aclVersion = 0
> > > > > > > > ephemeralOwner = 0x0
> > > > > > > > dataLength = 2
> > > > > > > > numChildren = 0
> > > > > > > >
> > > > > > > > After sending some messages via the console producer, the
> only
> > > > values
> > > > > > > that
> > > > > > > > appear to change are (same values omitted):
> > > > > > > >
> > > > > > > > get /consumers/test_group/offsets/testTopic/0-0
> > > > > > > > mZxid = 0x11a
> > > > > > > > mtime = Thu May 10 09:50:18 MDT 2012
> > > > > > > > dataVersion = 229
> > > > > > > >
> > > > > > > > I'm not sure if that helps any, but I really appreciate your
> > > > > > assistance.
> > > > > > > >
> > > > > > > > On Wed, May 9, 2012 at 3:08 PM, Jun Rao <jun...@gmail.com>
> > > wrote:
> > > > > > > >
> > > > > > > > > Run bin/zookeeper-shell.sh. This will bring up an
> interactive
> > > ZK
> > > > > > shell.
> > > > > > > > > Type ? to see the list of commands. You can do things like
> > "ls
> > > > > > > > > /consumers/[group_id]/ids" and get "
> > > > > > > > > /consumers/[group_id]/ids/[consumer_id]". Do this while you
> > > > > consumer
> > > > > > > code
> > > > > > > > > is running.
> > > > > > > > >
> > > > > > > > > Jun
> > > > > > > > >
> > > > > > > > > On Wed, May 9, 2012 at 12:53 PM, lessonz <
> > > > lessonz.leg...@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > I must apologize for my ignorance. I'm not sure where I
> > > should
> > > > be
> > > > > > > > looking
> > > > > > > > > > for these values. The bin/zookeeper-shell.sh doesn't have
> > > them
> > > > (I
> > > > > > > don't
> > > > > > > > > > think it should). When I cleaned the log directory to
> start
> > > > from
> > > > > > > > > scratch, I
> > > > > > > > > > get /tmp/zookeeper/version-2/log.1 and
> > > > /tmp/zookeeper/version-2/.
> > > > > > > First
> > > > > > > > > > they both appear to be binary, should they be? Since
> > they're
> > > > > binary
> > > > > > > and
> > > > > > > > > I'm
> > > > > > > > > > using PuTTY, grepping is of limited value (
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://www.chiark.greenend.org.uk/~sgtatham/putty/faq.html#faq-puttyputty
> > > > > > > > > > ).
> > > > > > > > > > The values you mentioned before definitely appear in
> log.1,
> > > but
> > > > > > > because
> > > > > > > > > of
> > > > > > > > > > the binary, I'm not seeing any assignment operators and
> > > trying
> > > > to
> > > > > > > make
> > > > > > > > > > heads or tails of it is currently beyond me. Also
> > > interesting,
> > > > > > pretty
> > > > > > > > > much
> > > > > > > > > > immediately log.1 caps at 67108864 bytes and stays there.
> > > It's
> > > > > time
> > > > > > > > stamp
> > > > > > > > > > is being updated, so it's still getting touched.
> > > > > > > > > >
> > > > > > > > > > So, I'm probably looking in the wrong place. Where/How
> can
> > I
> > > > find
> > > > > > > these
> > > > > > > > > > values?
> > > > > > > > > >
> > > > > > > > > > On Wed, May 9, 2012 at 11:09 AM, Jun Rao <
> jun...@gmail.com
> > >
> > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > I would try the test with numStreams=1 and see what
> > > happens.
> > > > > > Also,
> > > > > > > > you
> > > > > > > > > > may
> > > > > > > > > > > want to produce some new data after the test starts.
> > > > > > > > > > >
> > > > > > > > > > > If you still have problem, could you get the value of
> the
> > > > > > following
> > > > > > > > > paths
> > > > > > > > > > > from ZK (bin/zookeeper-shell.sh) after you started your
> > > > > consumer
> > > > > > > > test?
> > > > > > > > > > >
> > > > > > > > > > > /consumers/[group_id]/ids/[consumer_id]
> > > > > > > > > > >
> > > > > > > > > > >
> > > > /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
> > > > > > > > > > >
> > > > > > > > > > > Jun
> > > > > > > > > > >
> > > > > > > > > > > On Wed, May 9, 2012 at 8:49 AM, lessonz <
> > > > > > lessonz.leg...@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Okay, here is the amended code:
> > > > > > > > > > > >
> > > > > > > > > > > > import java.util.Calendar;
> > > > > > > > > > > > import java.util.Collections;
> > > > > > > > > > > > import java.util.List;
> > > > > > > > > > > > import java.util.Map;
> > > > > > > > > > > > import java.util.Properties;
> > > > > > > > > > > > import java.util.concurrent.ExecutorService;
> > > > > > > > > > > > import java.util.concurrent.Executors;
> > > > > > > > > > > > import java.util.logging.Logger;
> > > > > > > > > > > >
> > > > > > > > > > > > import kafka.consumer.Consumer;
> > > > > > > > > > > > import kafka.consumer.ConsumerConfig;
> > > > > > > > > > > > import kafka.consumer.KafkaMessageStream;
> > > > > > > > > > > > import kafka.javaapi.consumer.ConsumerConnector;
> > > > > > > > > > > > import kafka.message.Message;
> > > > > > > > > > > > import kafka.serializer.DefaultDecoder;
> > > > > > > > > > > >
> > > > > > > > > > > > public class KafkaTestConsumer {
> > > > > > > > > > > >
> > > > > > > > > > > >    /**
> > > > > > > > > > > >     * @param args
> > > > > > > > > > > >     */
> > > > > > > > > > > >    public static void main(final String[] args) {
> > > > > > > > > > > >         try {
> > > > > > > > > > > >             final Logger logger =
> > > > > > > > > > Logger.getLogger("KafkaTestConsumer");
> > > > > > > > > > > >
> > > > > > > > > > > >            // specify some consumer properties
> > > > > > > > > > > >            final Properties props = new Properties();
> > > > > > > > > > > >            props.put("zk.connect",
> "testserver:2181");
> > > > > > > > > > > >            props.put("zk.connectiontimeout.ms",
> > > > "1000000");
> > > > > > > > > > > >            props.put("groupid", "test_group");
> > > > > > > > > > > >
> > > > > > > > > > > >            // Create the connection to the cluster
> > > > > > > > > > > >            final ConsumerConfig consumerConfig = new
> > > > > > > > > > > ConsumerConfig(props);
> > > > > > > > > > > >            final ConsumerConnector consumerConnector
> =
> > > > > > > > > > > > Consumer.createJavaConsumerConnector(consumerConfig);
> > > > > > > > > > > >
> > > > > > > > > > > >            // create 4 partitions of the stream for
> > topic
> > > > > > > > > “testTopic”,
> > > > > > > > > > to
> > > > > > > > > > > > allow
> > > > > > > > > > > >             // 4
> > > > > > > > > > > >             // threads to consume
> > > > > > > > > > > >            final String topicName = "testTopic";
> > > > > > > > > > > >            final int numStreams = 4;
> > > > > > > > > > > >            List<KafkaMessageStream<Message>> streams
> =
> > > > null;
> > > > > > > > > > > >            try {
> > > > > > > > > > > >                final Map<String,
> > > > > > > List<KafkaMessageStream<Message>>>
> > > > > > > > > > > > topicMessageStreams = consumerConnector
> > > > > > > > > > > >
> > > > > > > > > > > >
> > .createMessageStreams(Collections.singletonMap(topicName,
> > > > > > > > > numStreams),
> > > > > > > > > > > new
> > > > > > > > > > > > DefaultDecoder());
> > > > > > > > > > > >                streams =
> > > > topicMessageStreams.get(topicName);
> > > > > > > > > > > >            } catch (final Exception e) {
> > > > > > > > > > > >                logger.severe(e.getMessage());
> > > > > > > > > > > >            }
> > > > > > > > > > > >
> > > > > > > > > > > >            // create list of 4 threads to consume
> from
> > > each
> > > > > of
> > > > > > > the
> > > > > > > > > > > > partitions
> > > > > > > > > > > >            final ExecutorService executor =
> > > > > > > > > > > > Executors.newFixedThreadPool(numStreams);
> > > > > > > > > > > >
> > > > > > > > > > > >            // consume the messages in the threads
> > > > > > > > > > > >            for (final KafkaMessageStream<Message>
> > stream
> > > :
> > > > > > > > streams) {
> > > > > > > > > > > >                executor.submit(new Runnable() {
> > > > > > > > > > > >                    @Override
> > > > > > > > > > > >                    public void run() {
> > > > > > > > > > > >                         try {
> > > > > > > > > > > >                            while (true) {
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > logger.severe(Calendar.getInstance().getTime().toString());
> > > > > > > > > > > >                                if (stream == null) {
> > > > > > > > > > > >
> >  logger.severe("stream
> > > is
> > > > > > > > NULL.");
> > > > > > > > > > > >                                } else {
> > > > > > > > > > > >
> >  logger.severe("stream
> > > =
> > > > "
> > > > > +
> > > > > > > > > stream);
> > > > > > > > > > > >                                     for (final
> Message
> > > > > message
> > > > > > :
> > > > > > > > > > stream)
> > > > > > > > > > > {
> > > > > > > > > > > >
> > >  logger.severe("!");
> > > > > > > > > > > >
> > > > > > > > > >  logger.severe(message.toString());
> > > > > > > > > > > >                                    }
> > > > > > > > > > > >                                }
> > > > > > > > > > > >                            }
> > > > > > > > > > > >                         } catch (final Throwable t) {
> > > > > > > > > > > >                            logger.severe("In run " +
> > > > > > > > t.getMessage());
> > > > > > > > > > > >                        } finally {
> > > > > > > > > > > >                            if (stream == null) {
> > > > > > > > > > > >                                logger.severe("stream
> is
> > > > > > NULL.");
> > > > > > > > > > > >                            } else {
> > > > > > > > > > > >                                logger.severe("stream
> =
> > "
> > > +
> > > > > > > stream);
> > > > > > > > > > > >                            }
> > > > > > > > > > > >                        }
> > > > > > > > > > > >                    }
> > > > > > > > > > > >                });
> > > > > > > > > > > >            }
> > > > > > > > > > > >        } catch (final Throwable t) {
> > > > > > > > > > > >            System.err.println("In main" +
> > > t.getMessage());
> > > > > > > > > > > >        }
> > > > > > > > > > > >    }
> > > > > > > > > > > >
> > > > > > > > > > > > }
> > > > > > > > > > > >
> > > > > > > > > > > > Interesting things happen. I get the time printed out
> > > only
> > > > > once
> > > > > > > for
> > > > > > > > > > each
> > > > > > > > > > > > stream. If I use eclipse's debugging and a breakpoint
> > on
> > > > the
> > > > > > line
> > > > > > > > "if
> > > > > > > > > > > > (stream == null) {" in the "while (true) {" loop, the
> > > > > variable
> > > > > > > > stream
> > > > > > > > > > > says
> > > > > > > > > > > > "<error(s)_during_the_evaluation>" for value. If I
> step
> > > > over
> > > > > > this
> > > > > > > > > line,
> > > > > > > > > > > I'm
> > > > > > > > > > > > taken into the else clause, but the logger is never
> > > > executed,
> > > > > > and
> > > > > > > > > seems
> > > > > > > > > > > to
> > > > > > > > > > > > die when referencing the stream value. So, I got to
> > > > thinking
> > > > > > > maybe
> > > > > > > > > the
> > > > > > > > > > > > problem is a missing dependency somewhere or maybe a
> > > > > conflict.
> > > > > > > So,
> > > > > > > > > here
> > > > > > > > > > > are
> > > > > > > > > > > > the dependencies I have in that project's pom (the
> > > project
> > > > > has
> > > > > > > > other
> > > > > > > > > > > > pieces):
> > > > > > > > > > > >
> > > > > > > > > > > >    <dependencies>
> > > > > > > > > > > >        <!-- Import the JMS API, we use provided scope
> > as
> > > > the
> > > > > > API
> > > > > > > is
> > > > > > > > > > > > included in
> > > > > > > > > > > >            JBoss AS 7 -->
> > > > > > > > > > > >        <dependency>
> > > > > > > > > > > >
>  <groupId>org.jboss.spec.javax.jms</groupId>
> > > > > > > > > > > >
> > >  <artifactId>jboss-jms-api_1.1_spec</artifactId>
> > > > > > > > > > > >            <scope>provided</scope>
> > > > > > > > > > > >        </dependency>
> > > > > > > > > > > >        <!-- Import the JCA API, we use provided scope
> > as
> > > > the
> > > > > > API
> > > > > > > is
> > > > > > > > > > > > included in
> > > > > > > > > > > >            JBoss AS 7 -->
> > > > > > > > > > > >        <dependency>
> > > > > > > > > > > >
> > >  <groupId>org.jboss.spec.javax.resource</groupId>
> > > > > > > > > > > >
> > > > > >  <artifactId>jboss-connector-api_1.6_spec</artifactId>
> > > > > > > > > > > >            <scope>provided</scope>
> > > > > > > > > > > >        </dependency>
> > > > > > > > > > > >        <dependency>
> > > > > > > > > > > >
> >  <groupId>org.apache.incubator.kafka</groupId>
> > > > > > > > > > > >            <artifactId>kafka</artifactId>
> > > > > > > > > > > >            <version>0.7.0-incubating</version>
> > > > > > > > > > > >        </dependency>
> > > > > > > > > > > >    </dependencies>
> > > > > > > > > > > >
> > > > > > > > > > > > And here is the pom I'm using for kafka:
> > > > > > > > > > > >
> > > > > > > > > > > > <?xml version="1.0" encoding="UTF-8"?>
> > > > > > > > > > > > <project xsi:schemaLocation="
> > > > > http://maven.apache.org/POM/4.0.0
> > > > > > > > > > > > http://maven.apache.org/xsd/maven-4.0.0.xsd"; xmlns="
> > > > > > > > > > > > http://maven.apache.org/POM/4.0.0"; xmlns:xsi="
> > > > > > > > > > > > http://www.w3.org/2001/XMLSchema-instance";>
> > > > > > > > > > > >    <modelVersion>4.0.0</modelVersion>
> > > > > > > > > > > >    <groupId>org.apache.incubator.kafka</groupId>
> > > > > > > > > > > >    <artifactId>kafka</artifactId>
> > > > > > > > > > > >    <packaging>pom</packaging>
> > > > > > > > > > > >    <version>0.7.0-incubating</version>
> > > > > > > > > > > >    <name>Apache Kafka</name>
> > > > > > > > > > > >    <description>Apache Kafka is a distributed
> > > > > publish-subscribe
> > > > > > > > > > messaging
> > > > > > > > > > > > system</description>
> > > > > > > > > > > >    <url>http://incubator.apache.org/kafka</url>
> > > > > > > > > > > >    <inceptionYear>2012</inceptionYear>
> > > > > > > > > > > >    <licenses>
> > > > > > > > > > > >        <license>
> > > > > > > > > > > >            <name>The Apache Software License, Version
> > > > > > 2.0</name>
> > > > > > > > > > > >            <url>
> > > > > http://www.apache.org/licenses/LICENSE-2.0.txt
> > > > > > > > </url>
> > > > > > > > > > > >        </license>
> > > > > > > > > > > >    </licenses>
> > > > > > > > > > > >    <dependencies>
> > > > > > > > > > > >       <dependency>
> > > > > > > > > > > >            <groupId>org.scala-lang</groupId>
> > > > > > > > > > > >            <artifactId>scala-library</artifactId>
> > > > > > > > > > > >            <version>2.8.0</version>
> > > > > > > > > > > >       </dependency>
> > > > > > > > > > > >       <dependency>
> > > > > > > > > > > >            <groupId>com.github.sgroschupf</groupId>
> > > > > > > > > > > >            <artifactId>zkclient</artifactId>
> > > > > > > > > > > >            <version>0.1</version>
> > > > > > > > > > > >       </dependency>
> > > > > > > > > > > >    </dependencies>
> > > > > > > > > > > >    <scm>
> > > > > > > > > > > >        <connection>scm:svn:
> > > > > > > > > > > >
> http://svn.apache.org/repos/asf/incubator/kafka/trunk
> > > > > > > </connection>
> > > > > > > > > > > >        <developerConnection>scm:svn:
> > > > > > > > > > > >
> http://svn.apache.org/repos/asf/incubator/kafka/trunk
> > > > > > > > > > > > </developerConnection>
> > > > > > > > > > > >        <url>
> > > > > > > http://svn.apache.org/repos/asf/incubator/kafka/trunk
> > > > > > > > > > </url>
> > > > > > > > > > > >    </scm>
> > > > > > > > > > > > </project>
> > > > > > > > > > > >
> > > > > > > > > > > > Again, any input would be greatly appreciated.
> Thanks.
> > > > > > > > > > > >
> > > > > > > > > > > > On Tue, May 8, 2012 at 6:34 PM, Jun Rao <
> > > jun...@gmail.com>
> > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Could you put consumer run() code in try/catch and
> > log
> > > > all
> > > > > > > > > > throwables?
> > > > > > > > > > > > > Executors can eat exceptions.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Jun
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Tue, May 8, 2012 at 4:08 PM, lessonz <
> > > > > > > > lessonz.leg...@gmail.com>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > After trying and failing to get a more
> complicated
> > > > > consumer
> > > > > > > > > > working,
> > > > > > > > > > > I
> > > > > > > > > > > > > > decided to start at square one and use the
> example
> > > > code.
> > > > > > > Below
> > > > > > > > is
> > > > > > > > > > my
> > > > > > > > > > > > > barely
> > > > > > > > > > > > > > modified implementation:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > import java.util.Collections;
> > > > > > > > > > > > > > import java.util.List;
> > > > > > > > > > > > > > import java.util.Map;
> > > > > > > > > > > > > > import java.util.Properties;
> > > > > > > > > > > > > > import java.util.concurrent.ExecutorService;
> > > > > > > > > > > > > > import java.util.concurrent.Executors;
> > > > > > > > > > > > > > import java.util.logging.Logger;
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > import kafka.consumer.Consumer;
> > > > > > > > > > > > > > import kafka.consumer.ConsumerConfig;
> > > > > > > > > > > > > > import kafka.consumer.KafkaMessageStream;
> > > > > > > > > > > > > > import kafka.javaapi.consumer.ConsumerConnector;
> > > > > > > > > > > > > > import kafka.message.Message;
> > > > > > > > > > > > > > import kafka.serializer.DefaultDecoder;
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > public class KafkaTestConsumer {
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >    /**
> > > > > > > > > > > > > >     * @param args
> > > > > > > > > > > > > >     */
> > > > > > > > > > > > > >    public static void main(final String[] args) {
> > > > > > > > > > > > > >        final Logger logger =
> > > > > > > > > Logger.getLogger("KafkaTestConsumer");
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >        // specify some consumer properties
> > > > > > > > > > > > > >        final Properties props = new Properties();
> > > > > > > > > > > > > >        props.put("zk.connect",
> "testserver:2181");
> > > > > > > > > > > > > >        props.put("zk.connectiontimeout.ms",
> > > > "1000000");
> > > > > > > > > > > > > >        props.put("groupid", "test_group");
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >        // Create the connection to the cluster
> > > > > > > > > > > > > >        final ConsumerConfig consumerConfig = new
> > > > > > > > > > > ConsumerConfig(props);
> > > > > > > > > > > > > >        final ConsumerConnector consumerConnector
> =
> > > > > > > > > > > > > >
> > Consumer.createJavaConsumerConnector(consumerConfig);
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >        // create 4 partitions of the stream for
> > topic
> > > > > > > > > “testTopic”,
> > > > > > > > > > to
> > > > > > > > > > > > > > allow 4
> > > > > > > > > > > > > >        // threads to consume
> > > > > > > > > > > > > >        final String topicName = "testTopic";
> > > > > > > > > > > > > >        final int numStreams = 4;
> > > > > > > > > > > > > >        List<KafkaMessageStream<Message>> streams
> =
> > > > null;
> > > > > > > > > > > > > >        try {
> > > > > > > > > > > > > >            final Map<String,
> > > > > > > List<KafkaMessageStream<Message>>>
> > > > > > > > > > > > > > topicMessageStreams = consumerConnector
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > .createMessageStreams(Collections.singletonMap(topicName,
> > > > > > > > > > > numStreams),
> > > > > > > > > > > > > new
> > > > > > > > > > > > > > DefaultDecoder());
> > > > > > > > > > > > > >            streams =
> > > > topicMessageStreams.get(topicName);
> > > > > > > > > > > > > >        } catch (final Exception e) {
> > > > > > > > > > > > > >            logger.severe(e.getMessage());
> > > > > > > > > > > > > >        }
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >        // create list of 4 threads to consume
> from
> > > each
> > > > > of
> > > > > > > the
> > > > > > > > > > > > partitions
> > > > > > > > > > > > > >        final ExecutorService executor =
> > > > > > > > > > > > > > Executors.newFixedThreadPool(numStreams);
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >        // consume the messages in the threads
> > > > > > > > > > > > > >        for (final KafkaMessageStream<Message>
> > stream
> > > :
> > > > > > > > streams) {
> > > > > > > > > > > > > >            executor.submit(new Runnable() {
> > > > > > > > > > > > > >                @Override
> > > > > > > > > > > > > >                public void run() {
> > > > > > > > > > > > > >                    for (final Message message :
> > > > stream) {
> > > > > > > > > > > > > >                        logger.severe("!");
> > > > > > > > > > > > > >
> > > >  logger.severe(message.toString());
> > > > > > > > > > > > > >                    }
> > > > > > > > > > > > > >                }
> > > > > > > > > > > > > >            });
> > > > > > > > > > > > > >        }
> > > > > > > > > > > > > >    }
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > }
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > It runs, I get no errors, and nothing happens. I
> > > don't
> > > > > get
> > > > > > > any
> > > > > > > > > > > > messages.
> > > > > > > > > > > > > I
> > > > > > > > > > > > > > don't THINK it's an issue with my Kafka install
> for
> > > two
> > > > > > > > reasons:
> > > > > > > > > 1.
> > > > > > > > > > > > > > Zookeeper logs my client connection. 2. (Granted
> > it's
> > > > all
> > > > > > on
> > > > > > > > > > > localhost
> > > > > > > > > > > > > but)
> > > > > > > > > > > > > > When I used the console consumer and producer on
> > the
> > > > > > > instance,
> > > > > > > > > they
> > > > > > > > > > > > seem
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > work just fine.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Methodology is to start Zookeeper, start Kafka,
> > start
> > > > > above
> > > > > > > > > > > > application,
> > > > > > > > > > > > > > and then connect a console produce to generate
> > > > messages.
> > > > > > I'm
> > > > > > > > > really
> > > > > > > > > > > at
> > > > > > > > > > > > a
> > > > > > > > > > > > > > loss as to what's happening. Interestingly, if I
> > put
> > > in
> > > > > > > > > > breakpoints,
> > > > > > > > > > > I
> > > > > > > > > > > > > seem
> > > > > > > > > > > > > > to lose a handle as I eventually lose the ability
> > to
> > > > step
> > > > > > > over,
> > > > > > > > > > step
> > > > > > > > > > > > > into,
> > > > > > > > > > > > > > and so on.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I'd really appreciate any input.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Cheers.
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to