Okay, here's the current client code:

import java.nio.ByteBuffer;
import java.util.Calendar;
import java.util.Collections;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaMessageStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;

public class KafkaTestConsumer {

    /**
     * @param args
     */
    public static void main(final String[] args) {
        try {

            // specify some consumer properties
            final Properties props = new Properties();
            props.put("groupid", "test_group");
            props.put("socket.buffer.size", Integer.toString(2 * 1024 *
1024));
            props.put("fetch.size", Integer.toString(1024 * 1024));
            props.put("auto.commit", "true");
            props.put("autocommit.interval.ms", Integer.toString(10 *
1000));
            props.put("autooffset.reset", "largest");
            props.put("zk.connect", "testServer:2181");

            // Create the connection to the cluster
            final ConsumerConfig consumerConfig = new ConsumerConfig(props);
            final ConsumerConnector consumerConnector =
Consumer.createJavaConsumerConnector(consumerConfig);

            // create 4 partitions of the stream for topic “testTopic”, to
allow
            // 4
            // threads to consume
            final String topicName = "testTopic";
            final int numStreams = 1;

            final KafkaMessageStream<Message> stream = consumerConnector

.createMessageStreams(Collections.singletonMap(topicName,
numStreams)).get(topicName).get(0);

            // final Thread thread = new Thread(new Runnable() {
            // @Override
            // public void run() {
            try {
                while (true) {

System.err.println(Calendar.getInstance().getTime().toString());
                    for (final Message message : stream) {
                        System.err.println("!");
                        final ByteBuffer buffer = message.payload();
                        final byte[] bytes = new byte[buffer.remaining()];
                        buffer.get(bytes);
                        System.err.println(new String(bytes));
                    }
                }
            } catch (final Throwable t) {
                System.err.println("In run " + t.getMessage());
            } finally {
                if (stream == null) {
                    System.err.println("stream is NULL.");
                } else {
                    System.err.println("stream = " + stream);
                }
            }
            // }
            // });
            //
            // thread.start();
        } catch (final Throwable t) {
            System.err.println("In main " + t.getMessage());
        }
    }
}

On launching, zookeeper logs:

[2012-05-14 14:41:51,811] INFO Accepted socket connection from
/[clientIP]:51723 (org.apache.zookeeper.server.NIOServerCnxn)
[2012-05-14 14:41:51,815] INFO Client attempting to establish new session
at /[clientIP]:51723 (org.apache.zookeeper.server.NIOServerCnxn)
[2012-05-14 14:41:51,819] INFO Established session 0x1374d14f90a0003 with
negotiated timeout 6000 for client /[clientIP]:51723
(org.apache.zookeeper.server.NIOServerCnxn)

In the program itself, I get to the line
"System.err.println(Calendar.getInstance().getTime().toString());" exactly
once. Through trial and error, it seems like everything dies after
attempting to reference the "stream" variable. In any event, I don't get
any messages.

Again, maybe I'm doing something wrong with regards to project
configuration. Maybe I'm missing some upstream dependency. I don't know.

So, then I decided to install Kafka on a second machine and simply try to
connect its console consumer to rule out network issues. Well, I don't get
any messages that way either. In the consumer, I see these two errors (with
the info between them):

[2012-05-14 14:24:15,310] ERROR error in earliestOrLatestOffset()
(kafka.consumer.ZookeeperConsumerConnector)
java.net.ConnectException: Connection refused
        at sun.nio.ch.Net.connect(Native Method)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500)
        at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:54)
        at
kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:193)
        at
kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:156)
        at
kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$earliestOrLatestOffset(ZookeeperConsumerConnector.scala:317)
        at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.addPartitionTopicInfo(ZookeeperConsumerConnector.scala:564)
        at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$processPartition(ZookeeperConsumerConnector.scala:548)
        at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$1$$anonfun$apply$9$$anonfun$apply$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:507)
        at
scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:285)
        at
scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
        at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$1$$anonfun$apply$9.apply(ZookeeperConsumerConnector.scala:504)
        at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$1$$anonfun$apply$9.apply(ZookeeperConsumerConnector.scala:491)
        at scala.collection.mutable.HashSet.foreach(HashSet.scala:61)
        at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$1.apply(ZookeeperConsumerConnector.scala:491)
        at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$1.apply(ZookeeperConsumerConnector.scala:477)
        at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at scala.collection.Iterator$class.foreach(Iterator.scala:631)
        at
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
        at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
        at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:477)
        at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:437)
        at
scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
        at
scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
        at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:433)
        at
kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:202)
        at
kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:111)
        at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:122)
        at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala)
[2012-05-14 14:24:15,318] INFO Consumer
console-consumer-47001_testServer-1337027055000-a01ec4c3 selected
partitions : testTopic:0-0: fetched offset = -1: consumed offset = -1
(kafka.consumer.ZookeeperConsumerConnector)
[2012-05-14 14:24:15,324] INFO end rebalancing consumer
console-consumer-47001_testServer-1337027055000-a01ec4c3 try #0
(kafka.consumer.ZookeeperConsumerConnector)
[2012-05-14 14:24:15,327] INFO FetchRunnable-0 start fetching topic:
testTopic part: 0 offset: -1 from 127.0.0.1:9092(kafka.consumer.FetcherRunnable)
[2012-05-14 14:24:15,328] ERROR error in FetcherRunnable
(kafka.consumer.FetcherRunnable)
java.net.ConnectException: Connection refused
        at sun.nio.ch.Net.connect(Native Method)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500)
        at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:54)
        at
kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:193)
        at
kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:120)
        at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:64)

On the server to which I was attempting to connect, I get a bunch of these
in zookeeper's output:

[2012-05-14 14:33:14,498] INFO Got user-level KeeperException when
processing sessionid:0x1374ceff0c0000f type:create cxid:0x12
zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error
Path:/consumers/console-consumer-48481/owners/testTopic
Error:KeeperErrorCode = NoNode for
/consumers/console-consumer-48481/owners/testTopic
(org.apache.zookeeper.server.PrepRequestProcessor)

The zookeeper output I actually also see when I connect a local copy of the
console consumer. So, that may not be relevant as the local consumer works
as expected.

I'm really at a loss. Thanks again for your help.

On Mon, May 14, 2012 at 9:03 AM, Jun Rao <jun...@gmail.com> wrote:

> Maybe you can describe again the latest (potentially simplest) code that
> you have and the problem that you still see.
>
> Thanks,
>
> Jun
>
> On Fri, May 11, 2012 at 3:47 PM, lessonz <lessonz.leg...@gmail.com> wrote:
>
> > Okay, I replaced every call to logger with System.err.println. Still same
> > behavior. Obviously I must be doing something wrong, but for the life of
> me
> > I can't figure out what.
> >
> > On Fri, May 11, 2012 at 3:09 PM, Jun Rao <jun...@gmail.com> wrote:
> >
> > > How about using printf, instead of logger?
> > >
> > > Jun
> > >
> > > On Fri, May 11, 2012 at 1:40 PM, lessonz <lessonz.leg...@gmail.com>
> > wrote:
> > >
> > > > Well, here is probably a better pom for Kafka:
> > > >
> > > > <?xml version="1.0" encoding="UTF-8"?>
> > > > <project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> > > > http://maven.apache.org/xsd/maven-4.0.0.xsd"; xmlns="
> > > > http://maven.apache.org/POM/4.0.0"; xmlns:xsi="
> > > > http://www.w3.org/2001/XMLSchema-instance";>
> > > >    <modelVersion>4.0.0</modelVersion>
> > > >    <groupId>org.apache.incubator.kafka</groupId>
> > > >    <artifactId>kafka</artifactId>
> > > >    <packaging>pom</packaging>
> > > >    <version>0.7.0-incubating</version>
> > > >    <name>Apache Kafka</name>
> > > >    <description>Apache Kafka is a distributed publish-subscribe
> > messaging
> > > > system</description>
> > > >    <url>http://incubator.apache.org/kafka</url>
> > > >    <inceptionYear>2012</inceptionYear>
> > > >    <licenses>
> > > >        <license>
> > > >            <name>The Apache Software License, Version 2.0</name>
> > > >            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
> > > >        </license>
> > > >    </licenses>
> > > >    <dependencies>
> > > >        <dependency>
> > > >            <groupId>org.scala-lang</groupId>
> > > >            <artifactId>scala-library</artifactId>
> > > >            <version>2.8.0</version>
> > > >        </dependency>
> > > >        <dependency>
> > > >             <groupId>org.apache.zookeeper</groupId>
> > > >            <artifactId>zookeeper</artifactId>
> > > >            <version>3.3.3</version>
> > > >            <scope>compile</scope>
> > > >        </dependency>
> > > >        <dependency>
> > > >            <groupId>net.sf.jopt-simple</groupId>
> > > >            <artifactId>jopt-simple</artifactId>
> > > >            <version>3.2</version>
> > > >            <scope>compile</scope>
> > > >        </dependency>
> > > >        <dependency>
> > > >            <groupId>log4j</groupId>
> > > >            <artifactId>log4j</artifactId>
> > > >            <version>1.2.15</version>
> > > >            <scope>compile</scope>
> > > >        </dependency>
> > > >        <dependency>
> > > >            <groupId>zkclient</groupId>
> > > >            <artifactId>zkclient</artifactId>
> > > >            <version>20110412</version>
> > > >            <scope>compile</scope>
> > > >         </dependency>
> > > >    </dependencies>
> > > >    <scm>
> > > >        <connection>scm:svn:
> > > > http://svn.apache.org/repos/asf/incubator/kafka/trunk</connection>
> > > >        <developerConnection>scm:svn:
> > > > http://svn.apache.org/repos/asf/incubator/kafka/trunk
> > > > </developerConnection>
> > > >        <url>http://svn.apache.org/repos/asf/incubator/kafka/trunk
> > </url>
> > > >    </scm>
> > > > </project>
> > > >
> > > > Unfortunately, I still can't get my client to work. I'm at a complete
> > > loss
> > > > as for how to proceed. Can anyone else run the consumer code
> presented
> > on
> > > > the quickstart page:
> > https://incubator.apache.org/kafka/quickstart.html?
> > > >
> > > > On Fri, May 11, 2012 at 12:59 PM, lessonz <lessonz.leg...@gmail.com>
> > > > wrote:
> > > >
> > > > > I realize message.toString() is not the way to access the payload,
> > but
> > > > > unless Scala changes this, in Java, if nothing else, toString
> should
> > > use
> > > > > Object's implementation which should at least print an object
> > > reference.
> > > > In
> > > > > any event, I modified the for-loop to:
> > > > >
> > > > >
> > > > >                         for (final Message message : stream) {
> > > > >                             logger.severe("!");
> > > > >                             final ByteBuffer buffer =
> > > message.payload();
> > > > >                             final byte[] bytes = new
> > > > > byte[buffer.remaining()];
> > > > >                             buffer.get(bytes);
> > > > >                             logger.severe(new String(bytes));
> > > > >                         }
> > > > >
> > > > > And there was no change in behavior. I'm now trying to familiarize
> > > myself
> > > > > with sbt to try and figure out if I'm missing a dependency. Thanks.
> > > > >
> > > > >
> > > > > On Fri, May 11, 2012 at 10:39 AM, Jun Rao <jun...@gmail.com>
> wrote:
> > > > >
> > > > >> Also, message.toString() is not the right way to convert the
> message
> > > to
> > > > >> String. You need to take the payload from the message and then
> > convert
> > > > the
> > > > >> bytes to string. Take a look at
> > > > >> examples/src/main/java/kafka/examples/ExampleUtils.java
> > > > >>
> > > > >> Jun
> > > > >>
> > > > >> On Fri, May 11, 2012 at 9:30 AM, lessonz <
> lessonz.leg...@gmail.com>
> > > > >> wrote:
> > > > >>
> > > > >> > Upon commenting out the logging output (the first and the
> > identical
> > > > one
> > > > >> in
> > > > >> > the finally clause), I'm still not getting anything from the
> > server.
> > > > I'm
> > > > >> > assuming the sample client code works for others. That's why I'm
> > > > >> worried my
> > > > >> > problem might be more environmental, like a missing dependency.
> > Has
> > > > >> anyone
> > > > >> > else tested the sample code?
> > > > >> >
> > > > >> > On Fri, May 11, 2012 at 10:13 AM, Jun Rao <jun...@gmail.com>
> > wrote:
> > > > >> >
> > > > >> > > Try not to do logger.severe("stream = " + stream). I am not
> sure
> > > if
> > > > a
> > > > >> > > stream is printable.
> > > > >> > >
> > > > >> > > Jun
> > > > >> > >
> > > > >> > > On Fri, May 11, 2012 at 8:27 AM, lessonz <
> > > lessonz.leg...@gmail.com>
> > > > >> > wrote:
> > > > >> > >
> > > > >> > > > Okay, here's the newly amended code:
> > > > >> > > >
> > > > >> > > > import java.util.Calendar;
> > > > >> > > > import java.util.Collections;
> > > > >> > > > import java.util.List;
> > > > >> > > > import java.util.Map;
> > > > >> > > > import java.util.Properties;
> > > > >> > > > import java.util.logging.Logger;
> > > > >> > > >
> > > > >> > > > import kafka.consumer.Consumer;
> > > > >> > > > import kafka.consumer.ConsumerConfig;
> > > > >> > > > import kafka.consumer.KafkaMessageStream;
> > > > >> > > > import kafka.javaapi.consumer.ConsumerConnector;
> > > > >> > > > import kafka.message.Message;
> > > > >> > > > import kafka.serializer.DefaultDecoder;
> > > > >> > > >
> > > > >> > > > public class KafkaTestConsumer {
> > > > >> > > >
> > > > >> > > >    /**
> > > > >> > > >     * @param args
> > > > >> > > >     */
> > > > >> > > >    public static void main(final String[] args) {
> > > > >> > > >        final Logger logger =
> > > > Logger.getLogger("KafkaTestConsumer");
> > > > >> > > >         try {
> > > > >> > > >
> > > > >> > > >            // specify some consumer properties
> > > > >> > > >            final Properties props = new Properties();
> > > > >> > > >             props.put("zk.connect", "testServer:2181");
> > > > >> > > >             props.put("zk.connectiontimeout.ms",
> "1000000");
> > > > >> > > >            props.put("groupid", "test_group");
> > > > >> > > >
> > > > >> > > >            // Create the connection to the cluster
> > > > >> > > >            final ConsumerConfig consumerConfig = new
> > > > >> > > ConsumerConfig(props);
> > > > >> > > >            final ConsumerConnector consumerConnector =
> > > > >> > > > Consumer.createJavaConsumerConnector(consumerConfig);
> > > > >> > > >
> > > > >> > > >            // create 4 partitions of the stream for topic
> > > > >> “testTopic”,
> > > > >> > to
> > > > >> > > > allow
> > > > >> > > >            // 4
> > > > >> > > >            // threads to consume
> > > > >> > > >            final String topicName = "testTopic";
> > > > >> > > >            final int numStreams = 1;
> > > > >> > > >            List<KafkaMessageStream<Message>> streams = null;
> > > > >> > > >            try {
> > > > >> > > >                final Map<String,
> > > > List<KafkaMessageStream<Message>>>
> > > > >> > > > topicMessageStreams = consumerConnector
> > > > >> > > >
> > > > >> > > > .createMessageStreams(Collections.singletonMap(topicName,
> > > > >> numStreams),
> > > > >> > > new
> > > > >> > > > DefaultDecoder());
> > > > >> > > >                streams = topicMessageStreams.get(topicName);
> > > > >> > > >            } catch (final Exception e) {
> > > > >> > > >                logger.severe(e.getMessage());
> > > > >> > > >            }
> > > > >> > > >
> > > > >> > > >             final KafkaMessageStream<Message> stream =
> > > > >> streams.get(0);
> > > > >> > > >
> > > > >> > > >            final Thread thread = new Thread(new Runnable() {
> > > > >> > > >                 @Override
> > > > >> > > >                public void run() {
> > > > >> > > >                    try {
> > > > >> > > >                        while (true) {
> > > > >> > > >
> > > > >> > > > logger.severe(Calendar.getInstance().getTime().toString());
> > > > >> > > >                            if (stream == null) {
> > > > >> > > >                                logger.severe("stream is
> > NULL.");
> > > > >> > > >                            } else {
> > > > >> > > >                                logger.severe("stream = " +
> > > > stream);
> > > > >> > > >                                for (final Message message :
> > > > stream)
> > > > >> {
> > > > >> > > >                                    logger.severe("!");
> > > > >> > > >
> > > > >>  logger.severe(message.toString());
> > > > >> > > >                                }
> > > > >> > > >                            }
> > > > >> > > >                        }
> > > > >> > > >                    } catch (final Throwable t) {
> > > > >> > > >                        logger.severe("In run " +
> > > t.getMessage());
> > > > >> > > >                    } finally {
> > > > >> > > >                        if (stream == null) {
> > > > >> > > >                            logger.severe("stream is NULL.");
> > > > >> > > >                        } else {
> > > > >> > > >                            logger.severe("stream = " +
> > stream);
> > > > >> > > >                        }
> > > > >> > > >                    }
> > > > >> > > >                }
> > > > >> > > >            });
> > > > >> > > >
> > > > >> > > >             thread.start();
> > > > >> > > >        } catch (final Throwable t) {
> > > > >> > > >            logger.severe("In main" + t.getMessage());
> > > > >> > > >        }
> > > > >> > > >    }
> > > > >> > > > }
> > > > >> > > >
> > > > >> > > > Behavior is identical to using the executor. I get the
> logged
> > > > time,
> > > > >> but
> > > > >> > > > nothing else. Using debugging stream is not null, but as
> soon
> > > as I
> > > > >> try
> > > > >> > to
> > > > >> > > > reference it in the line "logger.severe("stream = " +
> > stream);"
> > > it
> > > > >> > dies.
> > > > >> > > I
> > > > >> > > > don't get that output. So, it seems in both cases, my call
> to
> > > > >> > > > consumerConnector.createMessageStreams(...) is returning a
> Map
> > > of
> > > > >> the
> > > > >> > > right
> > > > >> > > > size, but its contents appear to be corrupt.
> > > > >> > > >
> > > > >> > > > I have the source, but I really know nothing about Scala. In
> > > > >> looking at
> > > > >> > > >
> > > > >>
> > > kafka.javaapi.consumer.ZookeeperConsumerConnector#createMessageStreams,
> > > > >> > > it
> > > > >> > > > all seems pretty straightforward. I have to wonder again
> about
> > > > >> whether
> > > > >> > > I'm
> > > > >> > > > somehow throwing a silent class definition not found
> > exception.
> > > > For
> > > > >> > > > dependencies I only have scala-library 2.8.0 and zkclient
> 0.1.
> > > Is
> > > > >> that
> > > > >> > > > really all that's needed? Are they the right versions?
> > > > >> > > >
> > > > >> > > > Again, thanks for your help.
> > > > >> > > >
> > > > >> > > >
> > > > >> > > > On Thu, May 10, 2012 at 6:01 PM, Jun Rao <jun...@gmail.com>
> > > > wrote:
> > > > >> > > >
> > > > >> > > > > Another thing that I would suggest is to not use Executors
> > and
> > > > >> start
> > > > >> > > your
> > > > >> > > > > own thread for consumption.
> > > > >> > > > >
> > > > >> > > > > Jun
> > > > >> > > > >
> > > > >> > > > > On Thu, May 10, 2012 at 12:14 PM, lessonz <
> > > > >> lessonz.leg...@gmail.com>
> > > > >> > > > > wrote:
> > > > >> > > > >
> > > > >> > > > > > Yes, get /consumers/test_group/offsets/
> > > > >> > > > > > testTopic/0-0 shows values changing. Neither
> > > > >> > > > /consumers/[group_id]/ids/[
> > > > >> > > > > > consumer_id] nor /consumers/test_group/owners change any
> > > > values.
> > > > >> > > > > Hopefully
> > > > >> > > > > > that tells us something? Again, thanks for all of your
> > help.
> > > > >> > > > > >
> > > > >> > > > > >
> > > > >> > > > > > On Thu, May 10, 2012 at 10:28 AM, Jun Rao <
> > jun...@gmail.com
> > > >
> > > > >> > wrote:
> > > > >> > > > > >
> > > > >> > > > > > > So you are saying that the value in
> > > > >> > > > > > > /consumers/test_group/offsets/testTopic/0-0
> > > > >> > > > > > > is moving? That typically means that the consumer is
> > > > >> consuming.
> > > > >> > > What
> > > > >> > > > > > about
> > > > >> > > > > > > the values of /consumers/test_group//ids and
> > > > >> > > > > /consumers/test_group/owner?
> > > > >> > > > > > >
> > > > >> > > > > > > Jun
> > > > >> > > > > > >
> > > > >> > > > > > > On Thu, May 10, 2012 at 8:53 AM, lessonz <
> > > > >> > lessonz.leg...@gmail.com
> > > > >> > > >
> > > > >> > > > > > wrote:
> > > > >> > > > > > >
> > > > >> > > > > > > > Well, that makes more sense. Sorry about that.
> > > > >> > > > > > > >
> > > > >> > > > > > > > Okay, so here are the results (I subbed in the
> [client
> > > > >> machine
> > > > >> > > > name]
> > > > >> > > > > > for
> > > > >> > > > > > > > the actual value):
> > > > >> > > > > > > >
> > > > >> > > > > > > > ls /consumers/test_group/ids
> > > > >> > > > > > > > [test_group_[client machine
> > > name]-1336664541413-4179e37b]
> > > > >> > > > > > > > get /consumers/test_group/ids/test_group_[client
> > machine
> > > > >> > > > > > > > name]-1336664541413-4179e37b
> > > > >> > > > > > > > { "testTopic": 1 }
> > > > >> > > > > > > > cZxid = 0xd6
> > > > >> > > > > > > > ctime = Thu May 10 09:39:39 MDT 2012
> > > > >> > > > > > > > mZxid = 0xd6
> > > > >> > > > > > > > mtime = Thu May 10 09:39:39 MDT 2012
> > > > >> > > > > > > > pZxid = 0xd6
> > > > >> > > > > > > > cversion = 0
> > > > >> > > > > > > > dataVersion = 0
> > > > >> > > > > > > > aclVersion = 0
> > > > >> > > > > > > > ephemeralOwner = 0x137376733c00002
> > > > >> > > > > > > > dataLength = 18
> > > > >> > > > > > > > numChildren = 0
> > > > >> > > > > > > >
> > > > >> > > > > > > > and
> > > > >> > > > > > > >
> > > > >> > > > > > > > ls /consumers/test_group/offsets/testTopic
> > > > >> > > > > > > > [0-0]
> > > > >> > > > > > > > get /consumers/test_group/offsets/testTopic/0-0
> > > > >> > > > > > > > -1
> > > > >> > > > > > > > cZxid = 0x1d
> > > > >> > > > > > > > ctime = Wed May 09 13:29:24 MDT 2012
> > > > >> > > > > > > > mZxid = 0x106
> > > > >> > > > > > > > mtime = Thu May 10 09:47:18 MDT 2012
> > > > >> > > > > > > > pZxid = 0x1d
> > > > >> > > > > > > > cversion = 0
> > > > >> > > > > > > > dataVersion = 211
> > > > >> > > > > > > > aclVersion = 0
> > > > >> > > > > > > > ephemeralOwner = 0x0
> > > > >> > > > > > > > dataLength = 2
> > > > >> > > > > > > > numChildren = 0
> > > > >> > > > > > > >
> > > > >> > > > > > > > After sending some messages via the console
> producer,
> > > the
> > > > >> only
> > > > >> > > > values
> > > > >> > > > > > > that
> > > > >> > > > > > > > appear to change are (same values omitted):
> > > > >> > > > > > > >
> > > > >> > > > > > > > get /consumers/test_group/offsets/testTopic/0-0
> > > > >> > > > > > > > mZxid = 0x11a
> > > > >> > > > > > > > mtime = Thu May 10 09:50:18 MDT 2012
> > > > >> > > > > > > > dataVersion = 229
> > > > >> > > > > > > >
> > > > >> > > > > > > > I'm not sure if that helps any, but I really
> > appreciate
> > > > your
> > > > >> > > > > > assistance.
> > > > >> > > > > > > >
> > > > >> > > > > > > > On Wed, May 9, 2012 at 3:08 PM, Jun Rao <
> > > jun...@gmail.com
> > > > >
> > > > >> > > wrote:
> > > > >> > > > > > > >
> > > > >> > > > > > > > > Run bin/zookeeper-shell.sh. This will bring up an
> > > > >> interactive
> > > > >> > > ZK
> > > > >> > > > > > shell.
> > > > >> > > > > > > > > Type ? to see the list of commands. You can do
> > things
> > > > like
> > > > >> > "ls
> > > > >> > > > > > > > > /consumers/[group_id]/ids" and get "
> > > > >> > > > > > > > > /consumers/[group_id]/ids/[consumer_id]". Do this
> > > while
> > > > >> you
> > > > >> > > > > consumer
> > > > >> > > > > > > code
> > > > >> > > > > > > > > is running.
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > Jun
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > On Wed, May 9, 2012 at 12:53 PM, lessonz <
> > > > >> > > > lessonz.leg...@gmail.com
> > > > >> > > > > >
> > > > >> > > > > > > > wrote:
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > > I must apologize for my ignorance. I'm not sure
> > > where
> > > > I
> > > > >> > > should
> > > > >> > > > be
> > > > >> > > > > > > > looking
> > > > >> > > > > > > > > > for these values. The bin/zookeeper-shell.sh
> > doesn't
> > > > >> have
> > > > >> > > them
> > > > >> > > > (I
> > > > >> > > > > > > don't
> > > > >> > > > > > > > > > think it should). When I cleaned the log
> directory
> > > to
> > > > >> start
> > > > >> > > > from
> > > > >> > > > > > > > > scratch, I
> > > > >> > > > > > > > > > get /tmp/zookeeper/version-2/log.1 and
> > > > >> > > > /tmp/zookeeper/version-2/.
> > > > >> > > > > > > First
> > > > >> > > > > > > > > > they both appear to be binary, should they be?
> > Since
> > > > >> > they're
> > > > >> > > > > binary
> > > > >> > > > > > > and
> > > > >> > > > > > > > > I'm
> > > > >> > > > > > > > > > using PuTTY, grepping is of limited value (
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> http://www.chiark.greenend.org.uk/~sgtatham/putty/faq.html#faq-puttyputty
> > > > >> > > > > > > > > > ).
> > > > >> > > > > > > > > > The values you mentioned before definitely
> appear
> > in
> > > > >> log.1,
> > > > >> > > but
> > > > >> > > > > > > because
> > > > >> > > > > > > > > of
> > > > >> > > > > > > > > > the binary, I'm not seeing any assignment
> > operators
> > > > and
> > > > >> > > trying
> > > > >> > > > to
> > > > >> > > > > > > make
> > > > >> > > > > > > > > > heads or tails of it is currently beyond me.
> Also
> > > > >> > > interesting,
> > > > >> > > > > > pretty
> > > > >> > > > > > > > > much
> > > > >> > > > > > > > > > immediately log.1 caps at 67108864 bytes and
> stays
> > > > >> there.
> > > > >> > > It's
> > > > >> > > > > time
> > > > >> > > > > > > > stamp
> > > > >> > > > > > > > > > is being updated, so it's still getting touched.
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > So, I'm probably looking in the wrong place.
> > > Where/How
> > > > >> can
> > > > >> > I
> > > > >> > > > find
> > > > >> > > > > > > these
> > > > >> > > > > > > > > > values?
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > On Wed, May 9, 2012 at 11:09 AM, Jun Rao <
> > > > >> jun...@gmail.com
> > > > >> > >
> > > > >> > > > > wrote:
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > > I would try the test with numStreams=1 and see
> > > what
> > > > >> > > happens.
> > > > >> > > > > > Also,
> > > > >> > > > > > > > you
> > > > >> > > > > > > > > > may
> > > > >> > > > > > > > > > > want to produce some new data after the test
> > > starts.
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > If you still have problem, could you get the
> > value
> > > > of
> > > > >> the
> > > > >> > > > > > following
> > > > >> > > > > > > > > paths
> > > > >> > > > > > > > > > > from ZK (bin/zookeeper-shell.sh) after you
> > started
> > > > >> your
> > > > >> > > > > consumer
> > > > >> > > > > > > > test?
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > /consumers/[group_id]/ids/[consumer_id]
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > >
> /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > Jun
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > On Wed, May 9, 2012 at 8:49 AM, lessonz <
> > > > >> > > > > > lessonz.leg...@gmail.com>
> > > > >> > > > > > > > > > wrote:
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > > Okay, here is the amended code:
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > import java.util.Calendar;
> > > > >> > > > > > > > > > > > import java.util.Collections;
> > > > >> > > > > > > > > > > > import java.util.List;
> > > > >> > > > > > > > > > > > import java.util.Map;
> > > > >> > > > > > > > > > > > import java.util.Properties;
> > > > >> > > > > > > > > > > > import java.util.concurrent.ExecutorService;
> > > > >> > > > > > > > > > > > import java.util.concurrent.Executors;
> > > > >> > > > > > > > > > > > import java.util.logging.Logger;
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > import kafka.consumer.Consumer;
> > > > >> > > > > > > > > > > > import kafka.consumer.ConsumerConfig;
> > > > >> > > > > > > > > > > > import kafka.consumer.KafkaMessageStream;
> > > > >> > > > > > > > > > > > import
> > kafka.javaapi.consumer.ConsumerConnector;
> > > > >> > > > > > > > > > > > import kafka.message.Message;
> > > > >> > > > > > > > > > > > import kafka.serializer.DefaultDecoder;
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > public class KafkaTestConsumer {
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > >    /**
> > > > >> > > > > > > > > > > >     * @param args
> > > > >> > > > > > > > > > > >     */
> > > > >> > > > > > > > > > > >    public static void main(final String[]
> > args)
> > > {
> > > > >> > > > > > > > > > > >         try {
> > > > >> > > > > > > > > > > >             final Logger logger =
> > > > >> > > > > > > > > > Logger.getLogger("KafkaTestConsumer");
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > >            // specify some consumer
> properties
> > > > >> > > > > > > > > > > >            final Properties props = new
> > > > >> Properties();
> > > > >> > > > > > > > > > > >            props.put("zk.connect",
> > > > >> "testserver:2181");
> > > > >> > > > > > > > > > > >            props.put("
> zk.connectiontimeout.ms
> > ",
> > > > >> > > > "1000000");
> > > > >> > > > > > > > > > > >            props.put("groupid",
> "test_group");
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > >            // Create the connection to the
> > > cluster
> > > > >> > > > > > > > > > > >            final ConsumerConfig
> > consumerConfig =
> > > > new
> > > > >> > > > > > > > > > > ConsumerConfig(props);
> > > > >> > > > > > > > > > > >            final ConsumerConnector
> > > > >> consumerConnector =
> > > > >> > > > > > > > > > > >
> > > > >> Consumer.createJavaConsumerConnector(consumerConfig);
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > >            // create 4 partitions of the
> > stream
> > > > for
> > > > >> > topic
> > > > >> > > > > > > > > “testTopic”,
> > > > >> > > > > > > > > > to
> > > > >> > > > > > > > > > > > allow
> > > > >> > > > > > > > > > > >             // 4
> > > > >> > > > > > > > > > > >             // threads to consume
> > > > >> > > > > > > > > > > >            final String topicName =
> > "testTopic";
> > > > >> > > > > > > > > > > >            final int numStreams = 4;
> > > > >> > > > > > > > > > > >            List<KafkaMessageStream<Message>>
> > > > >> streams =
> > > > >> > > > null;
> > > > >> > > > > > > > > > > >            try {
> > > > >> > > > > > > > > > > >                final Map<String,
> > > > >> > > > > > > List<KafkaMessageStream<Message>>>
> > > > >> > > > > > > > > > > > topicMessageStreams = consumerConnector
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > >
> > > > >> > .createMessageStreams(Collections.singletonMap(topicName,
> > > > >> > > > > > > > > numStreams),
> > > > >> > > > > > > > > > > new
> > > > >> > > > > > > > > > > > DefaultDecoder());
> > > > >> > > > > > > > > > > >                streams =
> > > > >> > > > topicMessageStreams.get(topicName);
> > > > >> > > > > > > > > > > >            } catch (final Exception e) {
> > > > >> > > > > > > > > > > >
>  logger.severe(e.getMessage());
> > > > >> > > > > > > > > > > >            }
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > >            // create list of 4 threads to
> > > consume
> > > > >> from
> > > > >> > > each
> > > > >> > > > > of
> > > > >> > > > > > > the
> > > > >> > > > > > > > > > > > partitions
> > > > >> > > > > > > > > > > >            final ExecutorService executor =
> > > > >> > > > > > > > > > > > Executors.newFixedThreadPool(numStreams);
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > >            // consume the messages in the
> > > threads
> > > > >> > > > > > > > > > > >            for (final
> > > KafkaMessageStream<Message>
> > > > >> > stream
> > > > >> > > :
> > > > >> > > > > > > > streams) {
> > > > >> > > > > > > > > > > >                executor.submit(new
> Runnable()
> > {
> > > > >> > > > > > > > > > > >                    @Override
> > > > >> > > > > > > > > > > >                    public void run() {
> > > > >> > > > > > > > > > > >                         try {
> > > > >> > > > > > > > > > > >                            while (true) {
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > >
> > > > >> > > logger.severe(Calendar.getInstance().getTime().toString());
> > > > >> > > > > > > > > > > >                                if (stream ==
> > > > null) {
> > > > >> > > > > > > > > > > >
> > > > >> >  logger.severe("stream
> > > > >> > > is
> > > > >> > > > > > > > NULL.");
> > > > >> > > > > > > > > > > >                                } else {
> > > > >> > > > > > > > > > > >
> > > > >> >  logger.severe("stream
> > > > >> > > =
> > > > >> > > > "
> > > > >> > > > > +
> > > > >> > > > > > > > > stream);
> > > > >> > > > > > > > > > > >                                     for
> (final
> > > > >> Message
> > > > >> > > > > message
> > > > >> > > > > > :
> > > > >> > > > > > > > > > stream)
> > > > >> > > > > > > > > > > {
> > > > >> > > > > > > > > > > >
> > > > >> > >  logger.severe("!");
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > >  logger.severe(message.toString());
> > > > >> > > > > > > > > > > >                                    }
> > > > >> > > > > > > > > > > >                                }
> > > > >> > > > > > > > > > > >                            }
> > > > >> > > > > > > > > > > >                         } catch (final
> > Throwable
> > > > t)
> > > > >> {
> > > > >> > > > > > > > > > > >                            logger.severe("In
> > run
> > > > " +
> > > > >> > > > > > > > t.getMessage());
> > > > >> > > > > > > > > > > >                        } finally {
> > > > >> > > > > > > > > > > >                            if (stream ==
> > null) {
> > > > >> > > > > > > > > > > >
> > > > >>  logger.severe("stream is
> > > > >> > > > > > NULL.");
> > > > >> > > > > > > > > > > >                            } else {
> > > > >> > > > > > > > > > > >
> > > > >>  logger.severe("stream =
> > > > >> > "
> > > > >> > > +
> > > > >> > > > > > > stream);
> > > > >> > > > > > > > > > > >                            }
> > > > >> > > > > > > > > > > >                        }
> > > > >> > > > > > > > > > > >                    }
> > > > >> > > > > > > > > > > >                });
> > > > >> > > > > > > > > > > >            }
> > > > >> > > > > > > > > > > >        } catch (final Throwable t) {
> > > > >> > > > > > > > > > > >            System.err.println("In main" +
> > > > >> > > t.getMessage());
> > > > >> > > > > > > > > > > >        }
> > > > >> > > > > > > > > > > >    }
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > }
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > Interesting things happen. I get the time
> > > printed
> > > > >> out
> > > > >> > > only
> > > > >> > > > > once
> > > > >> > > > > > > for
> > > > >> > > > > > > > > > each
> > > > >> > > > > > > > > > > > stream. If I use eclipse's debugging and a
> > > > >> breakpoint
> > > > >> > on
> > > > >> > > > the
> > > > >> > > > > > line
> > > > >> > > > > > > > "if
> > > > >> > > > > > > > > > > > (stream == null) {" in the "while (true) {"
> > > loop,
> > > > >> the
> > > > >> > > > > variable
> > > > >> > > > > > > > stream
> > > > >> > > > > > > > > > > says
> > > > >> > > > > > > > > > > > "<error(s)_during_the_evaluation>" for
> value.
> > > If I
> > > > >> step
> > > > >> > > > over
> > > > >> > > > > > this
> > > > >> > > > > > > > > line,
> > > > >> > > > > > > > > > > I'm
> > > > >> > > > > > > > > > > > taken into the else clause, but the logger
> is
> > > > never
> > > > >> > > > executed,
> > > > >> > > > > > and
> > > > >> > > > > > > > > seems
> > > > >> > > > > > > > > > > to
> > > > >> > > > > > > > > > > > die when referencing the stream value. So, I
> > got
> > > > to
> > > > >> > > > thinking
> > > > >> > > > > > > maybe
> > > > >> > > > > > > > > the
> > > > >> > > > > > > > > > > > problem is a missing dependency somewhere or
> > > > maybe a
> > > > >> > > > > conflict.
> > > > >> > > > > > > So,
> > > > >> > > > > > > > > here
> > > > >> > > > > > > > > > > are
> > > > >> > > > > > > > > > > > the dependencies I have in that project's
> pom
> > > (the
> > > > >> > > project
> > > > >> > > > > has
> > > > >> > > > > > > > other
> > > > >> > > > > > > > > > > > pieces):
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > >    <dependencies>
> > > > >> > > > > > > > > > > >        <!-- Import the JMS API, we use
> > provided
> > > > >> scope
> > > > >> > as
> > > > >> > > > the
> > > > >> > > > > > API
> > > > >> > > > > > > is
> > > > >> > > > > > > > > > > > included in
> > > > >> > > > > > > > > > > >            JBoss AS 7 -->
> > > > >> > > > > > > > > > > >        <dependency>
> > > > >> > > > > > > > > > > >
> > > > >>  <groupId>org.jboss.spec.javax.jms</groupId>
> > > > >> > > > > > > > > > > >
> > > > >> > >  <artifactId>jboss-jms-api_1.1_spec</artifactId>
> > > > >> > > > > > > > > > > >            <scope>provided</scope>
> > > > >> > > > > > > > > > > >        </dependency>
> > > > >> > > > > > > > > > > >        <!-- Import the JCA API, we use
> > provided
> > > > >> scope
> > > > >> > as
> > > > >> > > > the
> > > > >> > > > > > API
> > > > >> > > > > > > is
> > > > >> > > > > > > > > > > > included in
> > > > >> > > > > > > > > > > >            JBoss AS 7 -->
> > > > >> > > > > > > > > > > >        <dependency>
> > > > >> > > > > > > > > > > >
> > > > >> > >  <groupId>org.jboss.spec.javax.resource</groupId>
> > > > >> > > > > > > > > > > >
> > > > >> > > > > >  <artifactId>jboss-connector-api_1.6_spec</artifactId>
> > > > >> > > > > > > > > > > >            <scope>provided</scope>
> > > > >> > > > > > > > > > > >        </dependency>
> > > > >> > > > > > > > > > > >        <dependency>
> > > > >> > > > > > > > > > > >
> > > > >> >  <groupId>org.apache.incubator.kafka</groupId>
> > > > >> > > > > > > > > > > >            <artifactId>kafka</artifactId>
> > > > >> > > > > > > > > > > >
>  <version>0.7.0-incubating</version>
> > > > >> > > > > > > > > > > >        </dependency>
> > > > >> > > > > > > > > > > >    </dependencies>
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > And here is the pom I'm using for kafka:
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > <?xml version="1.0" encoding="UTF-8"?>
> > > > >> > > > > > > > > > > > <project xsi:schemaLocation="
> > > > >> > > > > http://maven.apache.org/POM/4.0.0
> > > > >> > > > > > > > > > > > http://maven.apache.org/xsd/maven-4.0.0.xsd
> "
> > > > >> xmlns="
> > > > >> > > > > > > > > > > > http://maven.apache.org/POM/4.0.0";
> > xmlns:xsi="
> > > > >> > > > > > > > > > > > http://www.w3.org/2001/XMLSchema-instance";>
> > > > >> > > > > > > > > > > >    <modelVersion>4.0.0</modelVersion>
> > > > >> > > > > > > > > > > >
> >  <groupId>org.apache.incubator.kafka</groupId>
> > > > >> > > > > > > > > > > >    <artifactId>kafka</artifactId>
> > > > >> > > > > > > > > > > >    <packaging>pom</packaging>
> > > > >> > > > > > > > > > > >    <version>0.7.0-incubating</version>
> > > > >> > > > > > > > > > > >    <name>Apache Kafka</name>
> > > > >> > > > > > > > > > > >    <description>Apache Kafka is a
> distributed
> > > > >> > > > > publish-subscribe
> > > > >> > > > > > > > > > messaging
> > > > >> > > > > > > > > > > > system</description>
> > > > >> > > > > > > > > > > >    <url>http://incubator.apache.org/kafka
> > </url>
> > > > >> > > > > > > > > > > >    <inceptionYear>2012</inceptionYear>
> > > > >> > > > > > > > > > > >    <licenses>
> > > > >> > > > > > > > > > > >        <license>
> > > > >> > > > > > > > > > > >            <name>The Apache Software
> License,
> > > > >> Version
> > > > >> > > > > > 2.0</name>
> > > > >> > > > > > > > > > > >            <url>
> > > > >> > > > > http://www.apache.org/licenses/LICENSE-2.0.txt
> > > > >> > > > > > > > </url>
> > > > >> > > > > > > > > > > >        </license>
> > > > >> > > > > > > > > > > >    </licenses>
> > > > >> > > > > > > > > > > >    <dependencies>
> > > > >> > > > > > > > > > > >       <dependency>
> > > > >> > > > > > > > > > > >            <groupId>org.scala-lang</groupId>
> > > > >> > > > > > > > > > > >
> > >  <artifactId>scala-library</artifactId>
> > > > >> > > > > > > > > > > >            <version>2.8.0</version>
> > > > >> > > > > > > > > > > >       </dependency>
> > > > >> > > > > > > > > > > >       <dependency>
> > > > >> > > > > > > > > > > >
> > > >  <groupId>com.github.sgroschupf</groupId>
> > > > >> > > > > > > > > > > >            <artifactId>zkclient</artifactId>
> > > > >> > > > > > > > > > > >            <version>0.1</version>
> > > > >> > > > > > > > > > > >       </dependency>
> > > > >> > > > > > > > > > > >    </dependencies>
> > > > >> > > > > > > > > > > >    <scm>
> > > > >> > > > > > > > > > > >        <connection>scm:svn:
> > > > >> > > > > > > > > > > >
> > > > >> http://svn.apache.org/repos/asf/incubator/kafka/trunk
> > > > >> > > > > > > </connection>
> > > > >> > > > > > > > > > > >        <developerConnection>scm:svn:
> > > > >> > > > > > > > > > > >
> > > > >> http://svn.apache.org/repos/asf/incubator/kafka/trunk
> > > > >> > > > > > > > > > > > </developerConnection>
> > > > >> > > > > > > > > > > >        <url>
> > > > >> > > > > > > http://svn.apache.org/repos/asf/incubator/kafka/trunk
> > > > >> > > > > > > > > > </url>
> > > > >> > > > > > > > > > > >    </scm>
> > > > >> > > > > > > > > > > > </project>
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > Again, any input would be greatly
> appreciated.
> > > > >> Thanks.
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > On Tue, May 8, 2012 at 6:34 PM, Jun Rao <
> > > > >> > > jun...@gmail.com>
> > > > >> > > > > > > wrote:
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > Could you put consumer run() code in
> > try/catch
> > > > and
> > > > >> > log
> > > > >> > > > all
> > > > >> > > > > > > > > > throwables?
> > > > >> > > > > > > > > > > > > Executors can eat exceptions.
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > Thanks,
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > Jun
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > On Tue, May 8, 2012 at 4:08 PM, lessonz <
> > > > >> > > > > > > > lessonz.leg...@gmail.com>
> > > > >> > > > > > > > > > > > wrote:
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > After trying and failing to get a more
> > > > >> complicated
> > > > >> > > > > consumer
> > > > >> > > > > > > > > > working,
> > > > >> > > > > > > > > > > I
> > > > >> > > > > > > > > > > > > > decided to start at square one and use
> the
> > > > >> example
> > > > >> > > > code.
> > > > >> > > > > > > Below
> > > > >> > > > > > > > is
> > > > >> > > > > > > > > > my
> > > > >> > > > > > > > > > > > > barely
> > > > >> > > > > > > > > > > > > > modified implementation:
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > import java.util.Collections;
> > > > >> > > > > > > > > > > > > > import java.util.List;
> > > > >> > > > > > > > > > > > > > import java.util.Map;
> > > > >> > > > > > > > > > > > > > import java.util.Properties;
> > > > >> > > > > > > > > > > > > > import
> > java.util.concurrent.ExecutorService;
> > > > >> > > > > > > > > > > > > > import java.util.concurrent.Executors;
> > > > >> > > > > > > > > > > > > > import java.util.logging.Logger;
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > import kafka.consumer.Consumer;
> > > > >> > > > > > > > > > > > > > import kafka.consumer.ConsumerConfig;
> > > > >> > > > > > > > > > > > > > import
> kafka.consumer.KafkaMessageStream;
> > > > >> > > > > > > > > > > > > > import
> > > > kafka.javaapi.consumer.ConsumerConnector;
> > > > >> > > > > > > > > > > > > > import kafka.message.Message;
> > > > >> > > > > > > > > > > > > > import kafka.serializer.DefaultDecoder;
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > public class KafkaTestConsumer {
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > >    /**
> > > > >> > > > > > > > > > > > > >     * @param args
> > > > >> > > > > > > > > > > > > >     */
> > > > >> > > > > > > > > > > > > >    public static void main(final
> String[]
> > > > args)
> > > > >> {
> > > > >> > > > > > > > > > > > > >        final Logger logger =
> > > > >> > > > > > > > > Logger.getLogger("KafkaTestConsumer");
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > >        // specify some consumer
> properties
> > > > >> > > > > > > > > > > > > >        final Properties props = new
> > > > >> Properties();
> > > > >> > > > > > > > > > > > > >        props.put("zk.connect",
> > > > >> "testserver:2181");
> > > > >> > > > > > > > > > > > > >        props.put("
> zk.connectiontimeout.ms
> > ",
> > > > >> > > > "1000000");
> > > > >> > > > > > > > > > > > > >        props.put("groupid",
> "test_group");
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > >        // Create the connection to the
> > > cluster
> > > > >> > > > > > > > > > > > > >        final ConsumerConfig
> > consumerConfig =
> > > > new
> > > > >> > > > > > > > > > > ConsumerConfig(props);
> > > > >> > > > > > > > > > > > > >        final ConsumerConnector
> > > > >> consumerConnector =
> > > > >> > > > > > > > > > > > > >
> > > > >> > Consumer.createJavaConsumerConnector(consumerConfig);
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > >        // create 4 partitions of the
> > stream
> > > > for
> > > > >> > topic
> > > > >> > > > > > > > > “testTopic”,
> > > > >> > > > > > > > > > to
> > > > >> > > > > > > > > > > > > > allow 4
> > > > >> > > > > > > > > > > > > >        // threads to consume
> > > > >> > > > > > > > > > > > > >        final String topicName =
> > "testTopic";
> > > > >> > > > > > > > > > > > > >        final int numStreams = 4;
> > > > >> > > > > > > > > > > > > >        List<KafkaMessageStream<Message>>
> > > > >> streams =
> > > > >> > > > null;
> > > > >> > > > > > > > > > > > > >        try {
> > > > >> > > > > > > > > > > > > >            final Map<String,
> > > > >> > > > > > > List<KafkaMessageStream<Message>>>
> > > > >> > > > > > > > > > > > > > topicMessageStreams = consumerConnector
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > .createMessageStreams(Collections.singletonMap(topicName,
> > > > >> > > > > > > > > > > numStreams),
> > > > >> > > > > > > > > > > > > new
> > > > >> > > > > > > > > > > > > > DefaultDecoder());
> > > > >> > > > > > > > > > > > > >            streams =
> > > > >> > > > topicMessageStreams.get(topicName);
> > > > >> > > > > > > > > > > > > >        } catch (final Exception e) {
> > > > >> > > > > > > > > > > > > >
>  logger.severe(e.getMessage());
> > > > >> > > > > > > > > > > > > >        }
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > >        // create list of 4 threads to
> > > consume
> > > > >> from
> > > > >> > > each
> > > > >> > > > > of
> > > > >> > > > > > > the
> > > > >> > > > > > > > > > > > partitions
> > > > >> > > > > > > > > > > > > >        final ExecutorService executor =
> > > > >> > > > > > > > > > > > > >
> Executors.newFixedThreadPool(numStreams);
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > >        // consume the messages in the
> > > threads
> > > > >> > > > > > > > > > > > > >        for (final
> > > KafkaMessageStream<Message>
> > > > >> > stream
> > > > >> > > :
> > > > >> > > > > > > > streams) {
> > > > >> > > > > > > > > > > > > >            executor.submit(new
> Runnable()
> > {
> > > > >> > > > > > > > > > > > > >                @Override
> > > > >> > > > > > > > > > > > > >                public void run() {
> > > > >> > > > > > > > > > > > > >                    for (final Message
> > > message
> > > > :
> > > > >> > > > stream) {
> > > > >> > > > > > > > > > > > > >
>  logger.severe("!");
> > > > >> > > > > > > > > > > > > >
> > > > >> > > >  logger.severe(message.toString());
> > > > >> > > > > > > > > > > > > >                    }
> > > > >> > > > > > > > > > > > > >                }
> > > > >> > > > > > > > > > > > > >            });
> > > > >> > > > > > > > > > > > > >        }
> > > > >> > > > > > > > > > > > > >    }
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > }
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > It runs, I get no errors, and nothing
> > > > happens. I
> > > > >> > > don't
> > > > >> > > > > get
> > > > >> > > > > > > any
> > > > >> > > > > > > > > > > > messages.
> > > > >> > > > > > > > > > > > > I
> > > > >> > > > > > > > > > > > > > don't THINK it's an issue with my Kafka
> > > > install
> > > > >> for
> > > > >> > > two
> > > > >> > > > > > > > reasons:
> > > > >> > > > > > > > > 1.
> > > > >> > > > > > > > > > > > > > Zookeeper logs my client connection. 2.
> > > > (Granted
> > > > >> > it's
> > > > >> > > > all
> > > > >> > > > > > on
> > > > >> > > > > > > > > > > localhost
> > > > >> > > > > > > > > > > > > but)
> > > > >> > > > > > > > > > > > > > When I used the console consumer and
> > > producer
> > > > on
> > > > >> > the
> > > > >> > > > > > > instance,
> > > > >> > > > > > > > > they
> > > > >> > > > > > > > > > > > seem
> > > > >> > > > > > > > > > > > > to
> > > > >> > > > > > > > > > > > > > work just fine.
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > Methodology is to start Zookeeper, start
> > > > Kafka,
> > > > >> > start
> > > > >> > > > > above
> > > > >> > > > > > > > > > > > application,
> > > > >> > > > > > > > > > > > > > and then connect a console produce to
> > > generate
> > > > >> > > > messages.
> > > > >> > > > > > I'm
> > > > >> > > > > > > > > really
> > > > >> > > > > > > > > > > at
> > > > >> > > > > > > > > > > > a
> > > > >> > > > > > > > > > > > > > loss as to what's happening.
> > Interestingly,
> > > > if I
> > > > >> > put
> > > > >> > > in
> > > > >> > > > > > > > > > breakpoints,
> > > > >> > > > > > > > > > > I
> > > > >> > > > > > > > > > > > > seem
> > > > >> > > > > > > > > > > > > > to lose a handle as I eventually lose
> the
> > > > >> ability
> > > > >> > to
> > > > >> > > > step
> > > > >> > > > > > > over,
> > > > >> > > > > > > > > > step
> > > > >> > > > > > > > > > > > > into,
> > > > >> > > > > > > > > > > > > > and so on.
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > I'd really appreciate any input.
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > Cheers.
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to