Well, that makes more sense. Sorry about that.

Okay, so here are the results (I subbed in the [client machine name] for
the actual value):

ls /consumers/test_group/ids
[test_group_[client machine name]-1336664541413-4179e37b]
get /consumers/test_group/ids/test_group_[client machine
name]-1336664541413-4179e37b
{ "testTopic": 1 }
cZxid = 0xd6
ctime = Thu May 10 09:39:39 MDT 2012
mZxid = 0xd6
mtime = Thu May 10 09:39:39 MDT 2012
pZxid = 0xd6
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x137376733c00002
dataLength = 18
numChildren = 0

and

ls /consumers/test_group/offsets/testTopic
[0-0]
get /consumers/test_group/offsets/testTopic/0-0
-1
cZxid = 0x1d
ctime = Wed May 09 13:29:24 MDT 2012
mZxid = 0x106
mtime = Thu May 10 09:47:18 MDT 2012
pZxid = 0x1d
cversion = 0
dataVersion = 211
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0

After sending some messages via the console producer, the only values that
appear to change are (same values omitted):

get /consumers/test_group/offsets/testTopic/0-0
mZxid = 0x11a
mtime = Thu May 10 09:50:18 MDT 2012
dataVersion = 229

I'm not sure if that helps any, but I really appreciate your assistance.

On Wed, May 9, 2012 at 3:08 PM, Jun Rao <jun...@gmail.com> wrote:

> Run bin/zookeeper-shell.sh. This will bring up an interactive ZK shell.
> Type ? to see the list of commands. You can do things like "ls
> /consumers/[group_id]/ids" and get "
> /consumers/[group_id]/ids/[consumer_id]". Do this while you consumer code
> is running.
>
> Jun
>
> On Wed, May 9, 2012 at 12:53 PM, lessonz <lessonz.leg...@gmail.com> wrote:
>
> > I must apologize for my ignorance. I'm not sure where I should be looking
> > for these values. The bin/zookeeper-shell.sh doesn't have them (I don't
> > think it should). When I cleaned the log directory to start from
> scratch, I
> > get /tmp/zookeeper/version-2/log.1 and /tmp/zookeeper/version-2/. First
> > they both appear to be binary, should they be? Since they're binary and
> I'm
> > using PuTTY, grepping is of limited value (
> >
> http://www.chiark.greenend.org.uk/~sgtatham/putty/faq.html#faq-puttyputty
> > ).
> > The values you mentioned before definitely appear in log.1, but because
> of
> > the binary, I'm not seeing any assignment operators and trying to make
> > heads or tails of it is currently beyond me. Also interesting, pretty
> much
> > immediately log.1 caps at 67108864 bytes and stays there. It's time stamp
> > is being updated, so it's still getting touched.
> >
> > So, I'm probably looking in the wrong place. Where/How can I find these
> > values?
> >
> > On Wed, May 9, 2012 at 11:09 AM, Jun Rao <jun...@gmail.com> wrote:
> >
> > > I would try the test with numStreams=1 and see what happens. Also, you
> > may
> > > want to produce some new data after the test starts.
> > >
> > > If you still have problem, could you get the value of the following
> paths
> > > from ZK (bin/zookeeper-shell.sh) after you started your consumer test?
> > >
> > > /consumers/[group_id]/ids/[consumer_id]
> > >
> > > /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
> > >
> > > Jun
> > >
> > > On Wed, May 9, 2012 at 8:49 AM, lessonz <lessonz.leg...@gmail.com>
> > wrote:
> > >
> > > > Okay, here is the amended code:
> > > >
> > > > import java.util.Calendar;
> > > > import java.util.Collections;
> > > > import java.util.List;
> > > > import java.util.Map;
> > > > import java.util.Properties;
> > > > import java.util.concurrent.ExecutorService;
> > > > import java.util.concurrent.Executors;
> > > > import java.util.logging.Logger;
> > > >
> > > > import kafka.consumer.Consumer;
> > > > import kafka.consumer.ConsumerConfig;
> > > > import kafka.consumer.KafkaMessageStream;
> > > > import kafka.javaapi.consumer.ConsumerConnector;
> > > > import kafka.message.Message;
> > > > import kafka.serializer.DefaultDecoder;
> > > >
> > > > public class KafkaTestConsumer {
> > > >
> > > >    /**
> > > >     * @param args
> > > >     */
> > > >    public static void main(final String[] args) {
> > > >         try {
> > > >             final Logger logger =
> > Logger.getLogger("KafkaTestConsumer");
> > > >
> > > >            // specify some consumer properties
> > > >            final Properties props = new Properties();
> > > >            props.put("zk.connect", "testserver:2181");
> > > >            props.put("zk.connectiontimeout.ms", "1000000");
> > > >            props.put("groupid", "test_group");
> > > >
> > > >            // Create the connection to the cluster
> > > >            final ConsumerConfig consumerConfig = new
> > > ConsumerConfig(props);
> > > >            final ConsumerConnector consumerConnector =
> > > > Consumer.createJavaConsumerConnector(consumerConfig);
> > > >
> > > >            // create 4 partitions of the stream for topic
> “testTopic”,
> > to
> > > > allow
> > > >             // 4
> > > >             // threads to consume
> > > >            final String topicName = "testTopic";
> > > >            final int numStreams = 4;
> > > >            List<KafkaMessageStream<Message>> streams = null;
> > > >            try {
> > > >                final Map<String, List<KafkaMessageStream<Message>>>
> > > > topicMessageStreams = consumerConnector
> > > >
> > > > .createMessageStreams(Collections.singletonMap(topicName,
> numStreams),
> > > new
> > > > DefaultDecoder());
> > > >                streams = topicMessageStreams.get(topicName);
> > > >            } catch (final Exception e) {
> > > >                logger.severe(e.getMessage());
> > > >            }
> > > >
> > > >            // create list of 4 threads to consume from each of the
> > > > partitions
> > > >            final ExecutorService executor =
> > > > Executors.newFixedThreadPool(numStreams);
> > > >
> > > >            // consume the messages in the threads
> > > >            for (final KafkaMessageStream<Message> stream : streams) {
> > > >                executor.submit(new Runnable() {
> > > >                    @Override
> > > >                    public void run() {
> > > >                         try {
> > > >                            while (true) {
> > > >
> > > > logger.severe(Calendar.getInstance().getTime().toString());
> > > >                                if (stream == null) {
> > > >                                    logger.severe("stream is NULL.");
> > > >                                } else {
> > > >                                    logger.severe("stream = " +
> stream);
> > > >                                     for (final Message message :
> > stream)
> > > {
> > > >                                        logger.severe("!");
> > > >
> >  logger.severe(message.toString());
> > > >                                    }
> > > >                                }
> > > >                            }
> > > >                         } catch (final Throwable t) {
> > > >                            logger.severe("In run " + t.getMessage());
> > > >                        } finally {
> > > >                            if (stream == null) {
> > > >                                logger.severe("stream is NULL.");
> > > >                            } else {
> > > >                                logger.severe("stream = " + stream);
> > > >                            }
> > > >                        }
> > > >                    }
> > > >                });
> > > >            }
> > > >        } catch (final Throwable t) {
> > > >            System.err.println("In main" + t.getMessage());
> > > >        }
> > > >    }
> > > >
> > > > }
> > > >
> > > > Interesting things happen. I get the time printed out only once for
> > each
> > > > stream. If I use eclipse's debugging and a breakpoint on the line "if
> > > > (stream == null) {" in the "while (true) {" loop, the variable stream
> > > says
> > > > "<error(s)_during_the_evaluation>" for value. If I step over this
> line,
> > > I'm
> > > > taken into the else clause, but the logger is never executed, and
> seems
> > > to
> > > > die when referencing the stream value. So, I got to thinking maybe
> the
> > > > problem is a missing dependency somewhere or maybe a conflict. So,
> here
> > > are
> > > > the dependencies I have in that project's pom (the project has other
> > > > pieces):
> > > >
> > > >    <dependencies>
> > > >        <!-- Import the JMS API, we use provided scope as the API is
> > > > included in
> > > >            JBoss AS 7 -->
> > > >        <dependency>
> > > >            <groupId>org.jboss.spec.javax.jms</groupId>
> > > >            <artifactId>jboss-jms-api_1.1_spec</artifactId>
> > > >            <scope>provided</scope>
> > > >        </dependency>
> > > >        <!-- Import the JCA API, we use provided scope as the API is
> > > > included in
> > > >            JBoss AS 7 -->
> > > >        <dependency>
> > > >            <groupId>org.jboss.spec.javax.resource</groupId>
> > > >            <artifactId>jboss-connector-api_1.6_spec</artifactId>
> > > >            <scope>provided</scope>
> > > >        </dependency>
> > > >        <dependency>
> > > >            <groupId>org.apache.incubator.kafka</groupId>
> > > >            <artifactId>kafka</artifactId>
> > > >            <version>0.7.0-incubating</version>
> > > >        </dependency>
> > > >    </dependencies>
> > > >
> > > > And here is the pom I'm using for kafka:
> > > >
> > > > <?xml version="1.0" encoding="UTF-8"?>
> > > > <project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> > > > http://maven.apache.org/xsd/maven-4.0.0.xsd"; xmlns="
> > > > http://maven.apache.org/POM/4.0.0"; xmlns:xsi="
> > > > http://www.w3.org/2001/XMLSchema-instance";>
> > > >    <modelVersion>4.0.0</modelVersion>
> > > >    <groupId>org.apache.incubator.kafka</groupId>
> > > >    <artifactId>kafka</artifactId>
> > > >    <packaging>pom</packaging>
> > > >    <version>0.7.0-incubating</version>
> > > >    <name>Apache Kafka</name>
> > > >    <description>Apache Kafka is a distributed publish-subscribe
> > messaging
> > > > system</description>
> > > >    <url>http://incubator.apache.org/kafka</url>
> > > >    <inceptionYear>2012</inceptionYear>
> > > >    <licenses>
> > > >        <license>
> > > >            <name>The Apache Software License, Version 2.0</name>
> > > >            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
> > > >        </license>
> > > >    </licenses>
> > > >    <dependencies>
> > > >       <dependency>
> > > >            <groupId>org.scala-lang</groupId>
> > > >            <artifactId>scala-library</artifactId>
> > > >            <version>2.8.0</version>
> > > >       </dependency>
> > > >       <dependency>
> > > >            <groupId>com.github.sgroschupf</groupId>
> > > >            <artifactId>zkclient</artifactId>
> > > >            <version>0.1</version>
> > > >       </dependency>
> > > >    </dependencies>
> > > >    <scm>
> > > >        <connection>scm:svn:
> > > > http://svn.apache.org/repos/asf/incubator/kafka/trunk</connection>
> > > >        <developerConnection>scm:svn:
> > > > http://svn.apache.org/repos/asf/incubator/kafka/trunk
> > > > </developerConnection>
> > > >        <url>http://svn.apache.org/repos/asf/incubator/kafka/trunk
> > </url>
> > > >    </scm>
> > > > </project>
> > > >
> > > > Again, any input would be greatly appreciated. Thanks.
> > > >
> > > > On Tue, May 8, 2012 at 6:34 PM, Jun Rao <jun...@gmail.com> wrote:
> > > >
> > > > > Could you put consumer run() code in try/catch and log all
> > throwables?
> > > > > Executors can eat exceptions.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Tue, May 8, 2012 at 4:08 PM, lessonz <lessonz.leg...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > After trying and failing to get a more complicated consumer
> > working,
> > > I
> > > > > > decided to start at square one and use the example code. Below is
> > my
> > > > > barely
> > > > > > modified implementation:
> > > > > >
> > > > > > import java.util.Collections;
> > > > > > import java.util.List;
> > > > > > import java.util.Map;
> > > > > > import java.util.Properties;
> > > > > > import java.util.concurrent.ExecutorService;
> > > > > > import java.util.concurrent.Executors;
> > > > > > import java.util.logging.Logger;
> > > > > >
> > > > > > import kafka.consumer.Consumer;
> > > > > > import kafka.consumer.ConsumerConfig;
> > > > > > import kafka.consumer.KafkaMessageStream;
> > > > > > import kafka.javaapi.consumer.ConsumerConnector;
> > > > > > import kafka.message.Message;
> > > > > > import kafka.serializer.DefaultDecoder;
> > > > > >
> > > > > > public class KafkaTestConsumer {
> > > > > >
> > > > > >    /**
> > > > > >     * @param args
> > > > > >     */
> > > > > >    public static void main(final String[] args) {
> > > > > >        final Logger logger =
> Logger.getLogger("KafkaTestConsumer");
> > > > > >
> > > > > >        // specify some consumer properties
> > > > > >        final Properties props = new Properties();
> > > > > >        props.put("zk.connect", "testserver:2181");
> > > > > >        props.put("zk.connectiontimeout.ms", "1000000");
> > > > > >        props.put("groupid", "test_group");
> > > > > >
> > > > > >        // Create the connection to the cluster
> > > > > >        final ConsumerConfig consumerConfig = new
> > > ConsumerConfig(props);
> > > > > >        final ConsumerConnector consumerConnector =
> > > > > > Consumer.createJavaConsumerConnector(consumerConfig);
> > > > > >
> > > > > >        // create 4 partitions of the stream for topic
> “testTopic”,
> > to
> > > > > > allow 4
> > > > > >        // threads to consume
> > > > > >        final String topicName = "testTopic";
> > > > > >        final int numStreams = 4;
> > > > > >        List<KafkaMessageStream<Message>> streams = null;
> > > > > >        try {
> > > > > >            final Map<String, List<KafkaMessageStream<Message>>>
> > > > > > topicMessageStreams = consumerConnector
> > > > > >
> > > > > > .createMessageStreams(Collections.singletonMap(topicName,
> > > numStreams),
> > > > > new
> > > > > > DefaultDecoder());
> > > > > >            streams = topicMessageStreams.get(topicName);
> > > > > >        } catch (final Exception e) {
> > > > > >            logger.severe(e.getMessage());
> > > > > >        }
> > > > > >
> > > > > >        // create list of 4 threads to consume from each of the
> > > > partitions
> > > > > >        final ExecutorService executor =
> > > > > > Executors.newFixedThreadPool(numStreams);
> > > > > >
> > > > > >        // consume the messages in the threads
> > > > > >        for (final KafkaMessageStream<Message> stream : streams) {
> > > > > >            executor.submit(new Runnable() {
> > > > > >                @Override
> > > > > >                public void run() {
> > > > > >                    for (final Message message : stream) {
> > > > > >                        logger.severe("!");
> > > > > >                        logger.severe(message.toString());
> > > > > >                    }
> > > > > >                }
> > > > > >            });
> > > > > >        }
> > > > > >    }
> > > > > >
> > > > > > }
> > > > > >
> > > > > > It runs, I get no errors, and nothing happens. I don't get any
> > > > messages.
> > > > > I
> > > > > > don't THINK it's an issue with my Kafka install for two reasons:
> 1.
> > > > > > Zookeeper logs my client connection. 2. (Granted it's all on
> > > localhost
> > > > > but)
> > > > > > When I used the console consumer and producer on the instance,
> they
> > > > seem
> > > > > to
> > > > > > work just fine.
> > > > > >
> > > > > > Methodology is to start Zookeeper, start Kafka, start above
> > > > application,
> > > > > > and then connect a console produce to generate messages. I'm
> really
> > > at
> > > > a
> > > > > > loss as to what's happening. Interestingly, if I put in
> > breakpoints,
> > > I
> > > > > seem
> > > > > > to lose a handle as I eventually lose the ability to step over,
> > step
> > > > > into,
> > > > > > and so on.
> > > > > >
> > > > > > I'd really appreciate any input.
> > > > > >
> > > > > > Cheers.
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to