Well, here is probably a better pom for Kafka:

<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd"; xmlns="
http://maven.apache.org/POM/4.0.0"; xmlns:xsi="
http://www.w3.org/2001/XMLSchema-instance";>
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.apache.incubator.kafka</groupId>
    <artifactId>kafka</artifactId>
    <packaging>pom</packaging>
    <version>0.7.0-incubating</version>
    <name>Apache Kafka</name>
    <description>Apache Kafka is a distributed publish-subscribe messaging
system</description>
    <url>http://incubator.apache.org/kafka</url>
    <inceptionYear>2012</inceptionYear>
    <licenses>
        <license>
            <name>The Apache Software License, Version 2.0</name>
            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
        </license>
    </licenses>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.3.3</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>net.sf.jopt-simple</groupId>
            <artifactId>jopt-simple</artifactId>
            <version>3.2</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.15</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>zkclient</groupId>
            <artifactId>zkclient</artifactId>
            <version>20110412</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>
    <scm>
        <connection>scm:svn:
http://svn.apache.org/repos/asf/incubator/kafka/trunk</connection>
        <developerConnection>scm:svn:
http://svn.apache.org/repos/asf/incubator/kafka/trunk</developerConnection>
        <url>http://svn.apache.org/repos/asf/incubator/kafka/trunk</url>
    </scm>
</project>

Unfortunately, I still can't get my client to work. I'm at a complete loss
as for how to proceed. Can anyone else run the consumer code presented on
the quickstart page: https://incubator.apache.org/kafka/quickstart.html ?

On Fri, May 11, 2012 at 12:59 PM, lessonz <lessonz.leg...@gmail.com> wrote:

> I realize message.toString() is not the way to access the payload, but
> unless Scala changes this, in Java, if nothing else, toString should use
> Object's implementation which should at least print an object reference. In
> any event, I modified the for-loop to:
>
>
>                         for (final Message message : stream) {
>                             logger.severe("!");
>                             final ByteBuffer buffer = message.payload();
>                             final byte[] bytes = new
> byte[buffer.remaining()];
>                             buffer.get(bytes);
>                             logger.severe(new String(bytes));
>                         }
>
> And there was no change in behavior. I'm now trying to familiarize myself
> with sbt to try and figure out if I'm missing a dependency. Thanks.
>
>
> On Fri, May 11, 2012 at 10:39 AM, Jun Rao <jun...@gmail.com> wrote:
>
>> Also, message.toString() is not the right way to convert the message to
>> String. You need to take the payload from the message and then convert the
>> bytes to string. Take a look at
>> examples/src/main/java/kafka/examples/ExampleUtils.java
>>
>> Jun
>>
>> On Fri, May 11, 2012 at 9:30 AM, lessonz <lessonz.leg...@gmail.com>
>> wrote:
>>
>> > Upon commenting out the logging output (the first and the identical one
>> in
>> > the finally clause), I'm still not getting anything from the server. I'm
>> > assuming the sample client code works for others. That's why I'm
>> worried my
>> > problem might be more environmental, like a missing dependency. Has
>> anyone
>> > else tested the sample code?
>> >
>> > On Fri, May 11, 2012 at 10:13 AM, Jun Rao <jun...@gmail.com> wrote:
>> >
>> > > Try not to do logger.severe("stream = " + stream). I am not sure if a
>> > > stream is printable.
>> > >
>> > > Jun
>> > >
>> > > On Fri, May 11, 2012 at 8:27 AM, lessonz <lessonz.leg...@gmail.com>
>> > wrote:
>> > >
>> > > > Okay, here's the newly amended code:
>> > > >
>> > > > import java.util.Calendar;
>> > > > import java.util.Collections;
>> > > > import java.util.List;
>> > > > import java.util.Map;
>> > > > import java.util.Properties;
>> > > > import java.util.logging.Logger;
>> > > >
>> > > > import kafka.consumer.Consumer;
>> > > > import kafka.consumer.ConsumerConfig;
>> > > > import kafka.consumer.KafkaMessageStream;
>> > > > import kafka.javaapi.consumer.ConsumerConnector;
>> > > > import kafka.message.Message;
>> > > > import kafka.serializer.DefaultDecoder;
>> > > >
>> > > > public class KafkaTestConsumer {
>> > > >
>> > > >    /**
>> > > >     * @param args
>> > > >     */
>> > > >    public static void main(final String[] args) {
>> > > >        final Logger logger = Logger.getLogger("KafkaTestConsumer");
>> > > >         try {
>> > > >
>> > > >            // specify some consumer properties
>> > > >            final Properties props = new Properties();
>> > > >             props.put("zk.connect", "testServer:2181");
>> > > >             props.put("zk.connectiontimeout.ms", "1000000");
>> > > >            props.put("groupid", "test_group");
>> > > >
>> > > >            // Create the connection to the cluster
>> > > >            final ConsumerConfig consumerConfig = new
>> > > ConsumerConfig(props);
>> > > >            final ConsumerConnector consumerConnector =
>> > > > Consumer.createJavaConsumerConnector(consumerConfig);
>> > > >
>> > > >            // create 4 partitions of the stream for topic
>> “testTopic”,
>> > to
>> > > > allow
>> > > >            // 4
>> > > >            // threads to consume
>> > > >            final String topicName = "testTopic";
>> > > >            final int numStreams = 1;
>> > > >            List<KafkaMessageStream<Message>> streams = null;
>> > > >            try {
>> > > >                final Map<String, List<KafkaMessageStream<Message>>>
>> > > > topicMessageStreams = consumerConnector
>> > > >
>> > > > .createMessageStreams(Collections.singletonMap(topicName,
>> numStreams),
>> > > new
>> > > > DefaultDecoder());
>> > > >                streams = topicMessageStreams.get(topicName);
>> > > >            } catch (final Exception e) {
>> > > >                logger.severe(e.getMessage());
>> > > >            }
>> > > >
>> > > >             final KafkaMessageStream<Message> stream =
>> streams.get(0);
>> > > >
>> > > >            final Thread thread = new Thread(new Runnable() {
>> > > >                 @Override
>> > > >                public void run() {
>> > > >                    try {
>> > > >                        while (true) {
>> > > >
>> > > > logger.severe(Calendar.getInstance().getTime().toString());
>> > > >                            if (stream == null) {
>> > > >                                logger.severe("stream is NULL.");
>> > > >                            } else {
>> > > >                                logger.severe("stream = " + stream);
>> > > >                                for (final Message message : stream)
>> {
>> > > >                                    logger.severe("!");
>> > > >
>>  logger.severe(message.toString());
>> > > >                                }
>> > > >                            }
>> > > >                        }
>> > > >                    } catch (final Throwable t) {
>> > > >                        logger.severe("In run " + t.getMessage());
>> > > >                    } finally {
>> > > >                        if (stream == null) {
>> > > >                            logger.severe("stream is NULL.");
>> > > >                        } else {
>> > > >                            logger.severe("stream = " + stream);
>> > > >                        }
>> > > >                    }
>> > > >                }
>> > > >            });
>> > > >
>> > > >             thread.start();
>> > > >        } catch (final Throwable t) {
>> > > >            logger.severe("In main" + t.getMessage());
>> > > >        }
>> > > >    }
>> > > > }
>> > > >
>> > > > Behavior is identical to using the executor. I get the logged time,
>> but
>> > > > nothing else. Using debugging stream is not null, but as soon as I
>> try
>> > to
>> > > > reference it in the line "logger.severe("stream = " + stream);" it
>> > dies.
>> > > I
>> > > > don't get that output. So, it seems in both cases, my call to
>> > > > consumerConnector.createMessageStreams(...) is returning a Map of
>> the
>> > > right
>> > > > size, but its contents appear to be corrupt.
>> > > >
>> > > > I have the source, but I really know nothing about Scala. In
>> looking at
>> > > >
>> kafka.javaapi.consumer.ZookeeperConsumerConnector#createMessageStreams,
>> > > it
>> > > > all seems pretty straightforward. I have to wonder again about
>> whether
>> > > I'm
>> > > > somehow throwing a silent class definition not found exception. For
>> > > > dependencies I only have scala-library 2.8.0 and zkclient 0.1. Is
>> that
>> > > > really all that's needed? Are they the right versions?
>> > > >
>> > > > Again, thanks for your help.
>> > > >
>> > > >
>> > > > On Thu, May 10, 2012 at 6:01 PM, Jun Rao <jun...@gmail.com> wrote:
>> > > >
>> > > > > Another thing that I would suggest is to not use Executors and
>> start
>> > > your
>> > > > > own thread for consumption.
>> > > > >
>> > > > > Jun
>> > > > >
>> > > > > On Thu, May 10, 2012 at 12:14 PM, lessonz <
>> lessonz.leg...@gmail.com>
>> > > > > wrote:
>> > > > >
>> > > > > > Yes, get /consumers/test_group/offsets/
>> > > > > > testTopic/0-0 shows values changing. Neither
>> > > > /consumers/[group_id]/ids/[
>> > > > > > consumer_id] nor /consumers/test_group/owners change any values.
>> > > > > Hopefully
>> > > > > > that tells us something? Again, thanks for all of your help.
>> > > > > >
>> > > > > >
>> > > > > > On Thu, May 10, 2012 at 10:28 AM, Jun Rao <jun...@gmail.com>
>> > wrote:
>> > > > > >
>> > > > > > > So you are saying that the value in
>> > > > > > > /consumers/test_group/offsets/testTopic/0-0
>> > > > > > > is moving? That typically means that the consumer is
>> consuming.
>> > > What
>> > > > > > about
>> > > > > > > the values of /consumers/test_group//ids and
>> > > > > /consumers/test_group/owner?
>> > > > > > >
>> > > > > > > Jun
>> > > > > > >
>> > > > > > > On Thu, May 10, 2012 at 8:53 AM, lessonz <
>> > lessonz.leg...@gmail.com
>> > > >
>> > > > > > wrote:
>> > > > > > >
>> > > > > > > > Well, that makes more sense. Sorry about that.
>> > > > > > > >
>> > > > > > > > Okay, so here are the results (I subbed in the [client
>> machine
>> > > > name]
>> > > > > > for
>> > > > > > > > the actual value):
>> > > > > > > >
>> > > > > > > > ls /consumers/test_group/ids
>> > > > > > > > [test_group_[client machine name]-1336664541413-4179e37b]
>> > > > > > > > get /consumers/test_group/ids/test_group_[client machine
>> > > > > > > > name]-1336664541413-4179e37b
>> > > > > > > > { "testTopic": 1 }
>> > > > > > > > cZxid = 0xd6
>> > > > > > > > ctime = Thu May 10 09:39:39 MDT 2012
>> > > > > > > > mZxid = 0xd6
>> > > > > > > > mtime = Thu May 10 09:39:39 MDT 2012
>> > > > > > > > pZxid = 0xd6
>> > > > > > > > cversion = 0
>> > > > > > > > dataVersion = 0
>> > > > > > > > aclVersion = 0
>> > > > > > > > ephemeralOwner = 0x137376733c00002
>> > > > > > > > dataLength = 18
>> > > > > > > > numChildren = 0
>> > > > > > > >
>> > > > > > > > and
>> > > > > > > >
>> > > > > > > > ls /consumers/test_group/offsets/testTopic
>> > > > > > > > [0-0]
>> > > > > > > > get /consumers/test_group/offsets/testTopic/0-0
>> > > > > > > > -1
>> > > > > > > > cZxid = 0x1d
>> > > > > > > > ctime = Wed May 09 13:29:24 MDT 2012
>> > > > > > > > mZxid = 0x106
>> > > > > > > > mtime = Thu May 10 09:47:18 MDT 2012
>> > > > > > > > pZxid = 0x1d
>> > > > > > > > cversion = 0
>> > > > > > > > dataVersion = 211
>> > > > > > > > aclVersion = 0
>> > > > > > > > ephemeralOwner = 0x0
>> > > > > > > > dataLength = 2
>> > > > > > > > numChildren = 0
>> > > > > > > >
>> > > > > > > > After sending some messages via the console producer, the
>> only
>> > > > values
>> > > > > > > that
>> > > > > > > > appear to change are (same values omitted):
>> > > > > > > >
>> > > > > > > > get /consumers/test_group/offsets/testTopic/0-0
>> > > > > > > > mZxid = 0x11a
>> > > > > > > > mtime = Thu May 10 09:50:18 MDT 2012
>> > > > > > > > dataVersion = 229
>> > > > > > > >
>> > > > > > > > I'm not sure if that helps any, but I really appreciate your
>> > > > > > assistance.
>> > > > > > > >
>> > > > > > > > On Wed, May 9, 2012 at 3:08 PM, Jun Rao <jun...@gmail.com>
>> > > wrote:
>> > > > > > > >
>> > > > > > > > > Run bin/zookeeper-shell.sh. This will bring up an
>> interactive
>> > > ZK
>> > > > > > shell.
>> > > > > > > > > Type ? to see the list of commands. You can do things like
>> > "ls
>> > > > > > > > > /consumers/[group_id]/ids" and get "
>> > > > > > > > > /consumers/[group_id]/ids/[consumer_id]". Do this while
>> you
>> > > > > consumer
>> > > > > > > code
>> > > > > > > > > is running.
>> > > > > > > > >
>> > > > > > > > > Jun
>> > > > > > > > >
>> > > > > > > > > On Wed, May 9, 2012 at 12:53 PM, lessonz <
>> > > > lessonz.leg...@gmail.com
>> > > > > >
>> > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > I must apologize for my ignorance. I'm not sure where I
>> > > should
>> > > > be
>> > > > > > > > looking
>> > > > > > > > > > for these values. The bin/zookeeper-shell.sh doesn't
>> have
>> > > them
>> > > > (I
>> > > > > > > don't
>> > > > > > > > > > think it should). When I cleaned the log directory to
>> start
>> > > > from
>> > > > > > > > > scratch, I
>> > > > > > > > > > get /tmp/zookeeper/version-2/log.1 and
>> > > > /tmp/zookeeper/version-2/.
>> > > > > > > First
>> > > > > > > > > > they both appear to be binary, should they be? Since
>> > they're
>> > > > > binary
>> > > > > > > and
>> > > > > > > > > I'm
>> > > > > > > > > > using PuTTY, grepping is of limited value (
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> http://www.chiark.greenend.org.uk/~sgtatham/putty/faq.html#faq-puttyputty
>> > > > > > > > > > ).
>> > > > > > > > > > The values you mentioned before definitely appear in
>> log.1,
>> > > but
>> > > > > > > because
>> > > > > > > > > of
>> > > > > > > > > > the binary, I'm not seeing any assignment operators and
>> > > trying
>> > > > to
>> > > > > > > make
>> > > > > > > > > > heads or tails of it is currently beyond me. Also
>> > > interesting,
>> > > > > > pretty
>> > > > > > > > > much
>> > > > > > > > > > immediately log.1 caps at 67108864 bytes and stays
>> there.
>> > > It's
>> > > > > time
>> > > > > > > > stamp
>> > > > > > > > > > is being updated, so it's still getting touched.
>> > > > > > > > > >
>> > > > > > > > > > So, I'm probably looking in the wrong place. Where/How
>> can
>> > I
>> > > > find
>> > > > > > > these
>> > > > > > > > > > values?
>> > > > > > > > > >
>> > > > > > > > > > On Wed, May 9, 2012 at 11:09 AM, Jun Rao <
>> jun...@gmail.com
>> > >
>> > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > > I would try the test with numStreams=1 and see what
>> > > happens.
>> > > > > > Also,
>> > > > > > > > you
>> > > > > > > > > > may
>> > > > > > > > > > > want to produce some new data after the test starts.
>> > > > > > > > > > >
>> > > > > > > > > > > If you still have problem, could you get the value of
>> the
>> > > > > > following
>> > > > > > > > > paths
>> > > > > > > > > > > from ZK (bin/zookeeper-shell.sh) after you started
>> your
>> > > > > consumer
>> > > > > > > > test?
>> > > > > > > > > > >
>> > > > > > > > > > > /consumers/[group_id]/ids/[consumer_id]
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
>> > > > > > > > > > >
>> > > > > > > > > > > Jun
>> > > > > > > > > > >
>> > > > > > > > > > > On Wed, May 9, 2012 at 8:49 AM, lessonz <
>> > > > > > lessonz.leg...@gmail.com>
>> > > > > > > > > > wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > > Okay, here is the amended code:
>> > > > > > > > > > > >
>> > > > > > > > > > > > import java.util.Calendar;
>> > > > > > > > > > > > import java.util.Collections;
>> > > > > > > > > > > > import java.util.List;
>> > > > > > > > > > > > import java.util.Map;
>> > > > > > > > > > > > import java.util.Properties;
>> > > > > > > > > > > > import java.util.concurrent.ExecutorService;
>> > > > > > > > > > > > import java.util.concurrent.Executors;
>> > > > > > > > > > > > import java.util.logging.Logger;
>> > > > > > > > > > > >
>> > > > > > > > > > > > import kafka.consumer.Consumer;
>> > > > > > > > > > > > import kafka.consumer.ConsumerConfig;
>> > > > > > > > > > > > import kafka.consumer.KafkaMessageStream;
>> > > > > > > > > > > > import kafka.javaapi.consumer.ConsumerConnector;
>> > > > > > > > > > > > import kafka.message.Message;
>> > > > > > > > > > > > import kafka.serializer.DefaultDecoder;
>> > > > > > > > > > > >
>> > > > > > > > > > > > public class KafkaTestConsumer {
>> > > > > > > > > > > >
>> > > > > > > > > > > >    /**
>> > > > > > > > > > > >     * @param args
>> > > > > > > > > > > >     */
>> > > > > > > > > > > >    public static void main(final String[] args) {
>> > > > > > > > > > > >         try {
>> > > > > > > > > > > >             final Logger logger =
>> > > > > > > > > > Logger.getLogger("KafkaTestConsumer");
>> > > > > > > > > > > >
>> > > > > > > > > > > >            // specify some consumer properties
>> > > > > > > > > > > >            final Properties props = new
>> Properties();
>> > > > > > > > > > > >            props.put("zk.connect",
>> "testserver:2181");
>> > > > > > > > > > > >            props.put("zk.connectiontimeout.ms",
>> > > > "1000000");
>> > > > > > > > > > > >            props.put("groupid", "test_group");
>> > > > > > > > > > > >
>> > > > > > > > > > > >            // Create the connection to the cluster
>> > > > > > > > > > > >            final ConsumerConfig consumerConfig = new
>> > > > > > > > > > > ConsumerConfig(props);
>> > > > > > > > > > > >            final ConsumerConnector
>> consumerConnector =
>> > > > > > > > > > > >
>> Consumer.createJavaConsumerConnector(consumerConfig);
>> > > > > > > > > > > >
>> > > > > > > > > > > >            // create 4 partitions of the stream for
>> > topic
>> > > > > > > > > “testTopic”,
>> > > > > > > > > > to
>> > > > > > > > > > > > allow
>> > > > > > > > > > > >             // 4
>> > > > > > > > > > > >             // threads to consume
>> > > > > > > > > > > >            final String topicName = "testTopic";
>> > > > > > > > > > > >            final int numStreams = 4;
>> > > > > > > > > > > >            List<KafkaMessageStream<Message>>
>> streams =
>> > > > null;
>> > > > > > > > > > > >            try {
>> > > > > > > > > > > >                final Map<String,
>> > > > > > > List<KafkaMessageStream<Message>>>
>> > > > > > > > > > > > topicMessageStreams = consumerConnector
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > .createMessageStreams(Collections.singletonMap(topicName,
>> > > > > > > > > numStreams),
>> > > > > > > > > > > new
>> > > > > > > > > > > > DefaultDecoder());
>> > > > > > > > > > > >                streams =
>> > > > topicMessageStreams.get(topicName);
>> > > > > > > > > > > >            } catch (final Exception e) {
>> > > > > > > > > > > >                logger.severe(e.getMessage());
>> > > > > > > > > > > >            }
>> > > > > > > > > > > >
>> > > > > > > > > > > >            // create list of 4 threads to consume
>> from
>> > > each
>> > > > > of
>> > > > > > > the
>> > > > > > > > > > > > partitions
>> > > > > > > > > > > >            final ExecutorService executor =
>> > > > > > > > > > > > Executors.newFixedThreadPool(numStreams);
>> > > > > > > > > > > >
>> > > > > > > > > > > >            // consume the messages in the threads
>> > > > > > > > > > > >            for (final KafkaMessageStream<Message>
>> > stream
>> > > :
>> > > > > > > > streams) {
>> > > > > > > > > > > >                executor.submit(new Runnable() {
>> > > > > > > > > > > >                    @Override
>> > > > > > > > > > > >                    public void run() {
>> > > > > > > > > > > >                         try {
>> > > > > > > > > > > >                            while (true) {
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > logger.severe(Calendar.getInstance().getTime().toString());
>> > > > > > > > > > > >                                if (stream == null) {
>> > > > > > > > > > > >
>> >  logger.severe("stream
>> > > is
>> > > > > > > > NULL.");
>> > > > > > > > > > > >                                } else {
>> > > > > > > > > > > >
>> >  logger.severe("stream
>> > > =
>> > > > "
>> > > > > +
>> > > > > > > > > stream);
>> > > > > > > > > > > >                                     for (final
>> Message
>> > > > > message
>> > > > > > :
>> > > > > > > > > > stream)
>> > > > > > > > > > > {
>> > > > > > > > > > > >
>> > >  logger.severe("!");
>> > > > > > > > > > > >
>> > > > > > > > > >  logger.severe(message.toString());
>> > > > > > > > > > > >                                    }
>> > > > > > > > > > > >                                }
>> > > > > > > > > > > >                            }
>> > > > > > > > > > > >                         } catch (final Throwable t)
>> {
>> > > > > > > > > > > >                            logger.severe("In run " +
>> > > > > > > > t.getMessage());
>> > > > > > > > > > > >                        } finally {
>> > > > > > > > > > > >                            if (stream == null) {
>> > > > > > > > > > > >
>>  logger.severe("stream is
>> > > > > > NULL.");
>> > > > > > > > > > > >                            } else {
>> > > > > > > > > > > >
>>  logger.severe("stream =
>> > "
>> > > +
>> > > > > > > stream);
>> > > > > > > > > > > >                            }
>> > > > > > > > > > > >                        }
>> > > > > > > > > > > >                    }
>> > > > > > > > > > > >                });
>> > > > > > > > > > > >            }
>> > > > > > > > > > > >        } catch (final Throwable t) {
>> > > > > > > > > > > >            System.err.println("In main" +
>> > > t.getMessage());
>> > > > > > > > > > > >        }
>> > > > > > > > > > > >    }
>> > > > > > > > > > > >
>> > > > > > > > > > > > }
>> > > > > > > > > > > >
>> > > > > > > > > > > > Interesting things happen. I get the time printed
>> out
>> > > only
>> > > > > once
>> > > > > > > for
>> > > > > > > > > > each
>> > > > > > > > > > > > stream. If I use eclipse's debugging and a
>> breakpoint
>> > on
>> > > > the
>> > > > > > line
>> > > > > > > > "if
>> > > > > > > > > > > > (stream == null) {" in the "while (true) {" loop,
>> the
>> > > > > variable
>> > > > > > > > stream
>> > > > > > > > > > > says
>> > > > > > > > > > > > "<error(s)_during_the_evaluation>" for value. If I
>> step
>> > > > over
>> > > > > > this
>> > > > > > > > > line,
>> > > > > > > > > > > I'm
>> > > > > > > > > > > > taken into the else clause, but the logger is never
>> > > > executed,
>> > > > > > and
>> > > > > > > > > seems
>> > > > > > > > > > > to
>> > > > > > > > > > > > die when referencing the stream value. So, I got to
>> > > > thinking
>> > > > > > > maybe
>> > > > > > > > > the
>> > > > > > > > > > > > problem is a missing dependency somewhere or maybe a
>> > > > > conflict.
>> > > > > > > So,
>> > > > > > > > > here
>> > > > > > > > > > > are
>> > > > > > > > > > > > the dependencies I have in that project's pom (the
>> > > project
>> > > > > has
>> > > > > > > > other
>> > > > > > > > > > > > pieces):
>> > > > > > > > > > > >
>> > > > > > > > > > > >    <dependencies>
>> > > > > > > > > > > >        <!-- Import the JMS API, we use provided
>> scope
>> > as
>> > > > the
>> > > > > > API
>> > > > > > > is
>> > > > > > > > > > > > included in
>> > > > > > > > > > > >            JBoss AS 7 -->
>> > > > > > > > > > > >        <dependency>
>> > > > > > > > > > > >
>>  <groupId>org.jboss.spec.javax.jms</groupId>
>> > > > > > > > > > > >
>> > >  <artifactId>jboss-jms-api_1.1_spec</artifactId>
>> > > > > > > > > > > >            <scope>provided</scope>
>> > > > > > > > > > > >        </dependency>
>> > > > > > > > > > > >        <!-- Import the JCA API, we use provided
>> scope
>> > as
>> > > > the
>> > > > > > API
>> > > > > > > is
>> > > > > > > > > > > > included in
>> > > > > > > > > > > >            JBoss AS 7 -->
>> > > > > > > > > > > >        <dependency>
>> > > > > > > > > > > >
>> > >  <groupId>org.jboss.spec.javax.resource</groupId>
>> > > > > > > > > > > >
>> > > > > >  <artifactId>jboss-connector-api_1.6_spec</artifactId>
>> > > > > > > > > > > >            <scope>provided</scope>
>> > > > > > > > > > > >        </dependency>
>> > > > > > > > > > > >        <dependency>
>> > > > > > > > > > > >
>> >  <groupId>org.apache.incubator.kafka</groupId>
>> > > > > > > > > > > >            <artifactId>kafka</artifactId>
>> > > > > > > > > > > >            <version>0.7.0-incubating</version>
>> > > > > > > > > > > >        </dependency>
>> > > > > > > > > > > >    </dependencies>
>> > > > > > > > > > > >
>> > > > > > > > > > > > And here is the pom I'm using for kafka:
>> > > > > > > > > > > >
>> > > > > > > > > > > > <?xml version="1.0" encoding="UTF-8"?>
>> > > > > > > > > > > > <project xsi:schemaLocation="
>> > > > > http://maven.apache.org/POM/4.0.0
>> > > > > > > > > > > > http://maven.apache.org/xsd/maven-4.0.0.xsd";
>> xmlns="
>> > > > > > > > > > > > http://maven.apache.org/POM/4.0.0"; xmlns:xsi="
>> > > > > > > > > > > > http://www.w3.org/2001/XMLSchema-instance";>
>> > > > > > > > > > > >    <modelVersion>4.0.0</modelVersion>
>> > > > > > > > > > > >    <groupId>org.apache.incubator.kafka</groupId>
>> > > > > > > > > > > >    <artifactId>kafka</artifactId>
>> > > > > > > > > > > >    <packaging>pom</packaging>
>> > > > > > > > > > > >    <version>0.7.0-incubating</version>
>> > > > > > > > > > > >    <name>Apache Kafka</name>
>> > > > > > > > > > > >    <description>Apache Kafka is a distributed
>> > > > > publish-subscribe
>> > > > > > > > > > messaging
>> > > > > > > > > > > > system</description>
>> > > > > > > > > > > >    <url>http://incubator.apache.org/kafka</url>
>> > > > > > > > > > > >    <inceptionYear>2012</inceptionYear>
>> > > > > > > > > > > >    <licenses>
>> > > > > > > > > > > >        <license>
>> > > > > > > > > > > >            <name>The Apache Software License,
>> Version
>> > > > > > 2.0</name>
>> > > > > > > > > > > >            <url>
>> > > > > http://www.apache.org/licenses/LICENSE-2.0.txt
>> > > > > > > > </url>
>> > > > > > > > > > > >        </license>
>> > > > > > > > > > > >    </licenses>
>> > > > > > > > > > > >    <dependencies>
>> > > > > > > > > > > >       <dependency>
>> > > > > > > > > > > >            <groupId>org.scala-lang</groupId>
>> > > > > > > > > > > >            <artifactId>scala-library</artifactId>
>> > > > > > > > > > > >            <version>2.8.0</version>
>> > > > > > > > > > > >       </dependency>
>> > > > > > > > > > > >       <dependency>
>> > > > > > > > > > > >            <groupId>com.github.sgroschupf</groupId>
>> > > > > > > > > > > >            <artifactId>zkclient</artifactId>
>> > > > > > > > > > > >            <version>0.1</version>
>> > > > > > > > > > > >       </dependency>
>> > > > > > > > > > > >    </dependencies>
>> > > > > > > > > > > >    <scm>
>> > > > > > > > > > > >        <connection>scm:svn:
>> > > > > > > > > > > >
>> http://svn.apache.org/repos/asf/incubator/kafka/trunk
>> > > > > > > </connection>
>> > > > > > > > > > > >        <developerConnection>scm:svn:
>> > > > > > > > > > > >
>> http://svn.apache.org/repos/asf/incubator/kafka/trunk
>> > > > > > > > > > > > </developerConnection>
>> > > > > > > > > > > >        <url>
>> > > > > > > http://svn.apache.org/repos/asf/incubator/kafka/trunk
>> > > > > > > > > > </url>
>> > > > > > > > > > > >    </scm>
>> > > > > > > > > > > > </project>
>> > > > > > > > > > > >
>> > > > > > > > > > > > Again, any input would be greatly appreciated.
>> Thanks.
>> > > > > > > > > > > >
>> > > > > > > > > > > > On Tue, May 8, 2012 at 6:34 PM, Jun Rao <
>> > > jun...@gmail.com>
>> > > > > > > wrote:
>> > > > > > > > > > > >
>> > > > > > > > > > > > > Could you put consumer run() code in try/catch and
>> > log
>> > > > all
>> > > > > > > > > > throwables?
>> > > > > > > > > > > > > Executors can eat exceptions.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Jun
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > On Tue, May 8, 2012 at 4:08 PM, lessonz <
>> > > > > > > > lessonz.leg...@gmail.com>
>> > > > > > > > > > > > wrote:
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > > After trying and failing to get a more
>> complicated
>> > > > > consumer
>> > > > > > > > > > working,
>> > > > > > > > > > > I
>> > > > > > > > > > > > > > decided to start at square one and use the
>> example
>> > > > code.
>> > > > > > > Below
>> > > > > > > > is
>> > > > > > > > > > my
>> > > > > > > > > > > > > barely
>> > > > > > > > > > > > > > modified implementation:
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > import java.util.Collections;
>> > > > > > > > > > > > > > import java.util.List;
>> > > > > > > > > > > > > > import java.util.Map;
>> > > > > > > > > > > > > > import java.util.Properties;
>> > > > > > > > > > > > > > import java.util.concurrent.ExecutorService;
>> > > > > > > > > > > > > > import java.util.concurrent.Executors;
>> > > > > > > > > > > > > > import java.util.logging.Logger;
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > import kafka.consumer.Consumer;
>> > > > > > > > > > > > > > import kafka.consumer.ConsumerConfig;
>> > > > > > > > > > > > > > import kafka.consumer.KafkaMessageStream;
>> > > > > > > > > > > > > > import kafka.javaapi.consumer.ConsumerConnector;
>> > > > > > > > > > > > > > import kafka.message.Message;
>> > > > > > > > > > > > > > import kafka.serializer.DefaultDecoder;
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > public class KafkaTestConsumer {
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > >    /**
>> > > > > > > > > > > > > >     * @param args
>> > > > > > > > > > > > > >     */
>> > > > > > > > > > > > > >    public static void main(final String[] args)
>> {
>> > > > > > > > > > > > > >        final Logger logger =
>> > > > > > > > > Logger.getLogger("KafkaTestConsumer");
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > >        // specify some consumer properties
>> > > > > > > > > > > > > >        final Properties props = new
>> Properties();
>> > > > > > > > > > > > > >        props.put("zk.connect",
>> "testserver:2181");
>> > > > > > > > > > > > > >        props.put("zk.connectiontimeout.ms",
>> > > > "1000000");
>> > > > > > > > > > > > > >        props.put("groupid", "test_group");
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > >        // Create the connection to the cluster
>> > > > > > > > > > > > > >        final ConsumerConfig consumerConfig = new
>> > > > > > > > > > > ConsumerConfig(props);
>> > > > > > > > > > > > > >        final ConsumerConnector
>> consumerConnector =
>> > > > > > > > > > > > > >
>> > Consumer.createJavaConsumerConnector(consumerConfig);
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > >        // create 4 partitions of the stream for
>> > topic
>> > > > > > > > > “testTopic”,
>> > > > > > > > > > to
>> > > > > > > > > > > > > > allow 4
>> > > > > > > > > > > > > >        // threads to consume
>> > > > > > > > > > > > > >        final String topicName = "testTopic";
>> > > > > > > > > > > > > >        final int numStreams = 4;
>> > > > > > > > > > > > > >        List<KafkaMessageStream<Message>>
>> streams =
>> > > > null;
>> > > > > > > > > > > > > >        try {
>> > > > > > > > > > > > > >            final Map<String,
>> > > > > > > List<KafkaMessageStream<Message>>>
>> > > > > > > > > > > > > > topicMessageStreams = consumerConnector
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > >
>> > > > .createMessageStreams(Collections.singletonMap(topicName,
>> > > > > > > > > > > numStreams),
>> > > > > > > > > > > > > new
>> > > > > > > > > > > > > > DefaultDecoder());
>> > > > > > > > > > > > > >            streams =
>> > > > topicMessageStreams.get(topicName);
>> > > > > > > > > > > > > >        } catch (final Exception e) {
>> > > > > > > > > > > > > >            logger.severe(e.getMessage());
>> > > > > > > > > > > > > >        }
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > >        // create list of 4 threads to consume
>> from
>> > > each
>> > > > > of
>> > > > > > > the
>> > > > > > > > > > > > partitions
>> > > > > > > > > > > > > >        final ExecutorService executor =
>> > > > > > > > > > > > > > Executors.newFixedThreadPool(numStreams);
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > >        // consume the messages in the threads
>> > > > > > > > > > > > > >        for (final KafkaMessageStream<Message>
>> > stream
>> > > :
>> > > > > > > > streams) {
>> > > > > > > > > > > > > >            executor.submit(new Runnable() {
>> > > > > > > > > > > > > >                @Override
>> > > > > > > > > > > > > >                public void run() {
>> > > > > > > > > > > > > >                    for (final Message message :
>> > > > stream) {
>> > > > > > > > > > > > > >                        logger.severe("!");
>> > > > > > > > > > > > > >
>> > > >  logger.severe(message.toString());
>> > > > > > > > > > > > > >                    }
>> > > > > > > > > > > > > >                }
>> > > > > > > > > > > > > >            });
>> > > > > > > > > > > > > >        }
>> > > > > > > > > > > > > >    }
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > }
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > It runs, I get no errors, and nothing happens. I
>> > > don't
>> > > > > get
>> > > > > > > any
>> > > > > > > > > > > > messages.
>> > > > > > > > > > > > > I
>> > > > > > > > > > > > > > don't THINK it's an issue with my Kafka install
>> for
>> > > two
>> > > > > > > > reasons:
>> > > > > > > > > 1.
>> > > > > > > > > > > > > > Zookeeper logs my client connection. 2. (Granted
>> > it's
>> > > > all
>> > > > > > on
>> > > > > > > > > > > localhost
>> > > > > > > > > > > > > but)
>> > > > > > > > > > > > > > When I used the console consumer and producer on
>> > the
>> > > > > > > instance,
>> > > > > > > > > they
>> > > > > > > > > > > > seem
>> > > > > > > > > > > > > to
>> > > > > > > > > > > > > > work just fine.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Methodology is to start Zookeeper, start Kafka,
>> > start
>> > > > > above
>> > > > > > > > > > > > application,
>> > > > > > > > > > > > > > and then connect a console produce to generate
>> > > > messages.
>> > > > > > I'm
>> > > > > > > > > really
>> > > > > > > > > > > at
>> > > > > > > > > > > > a
>> > > > > > > > > > > > > > loss as to what's happening. Interestingly, if I
>> > put
>> > > in
>> > > > > > > > > > breakpoints,
>> > > > > > > > > > > I
>> > > > > > > > > > > > > seem
>> > > > > > > > > > > > > > to lose a handle as I eventually lose the
>> ability
>> > to
>> > > > step
>> > > > > > > over,
>> > > > > > > > > > step
>> > > > > > > > > > > > > into,
>> > > > > > > > > > > > > > and so on.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > I'd really appreciate any input.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Cheers.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to