I must apologize for my ignorance. I'm not sure where I should be looking
for these values. The bin/zookeeper-shell.sh doesn't have them (I don't
think it should). When I cleaned the log directory to start from scratch, I
get /tmp/zookeeper/version-2/log.1 and /tmp/zookeeper/version-2/. First
they both appear to be binary, should they be? Since they're binary and I'm
using PuTTY, grepping is of limited value (
http://www.chiark.greenend.org.uk/~sgtatham/putty/faq.html#faq-puttyputty).
The values you mentioned before definitely appear in log.1, but because of
the binary, I'm not seeing any assignment operators and trying to make
heads or tails of it is currently beyond me. Also interesting, pretty much
immediately log.1 caps at 67108864 bytes and stays there. It's time stamp
is being updated, so it's still getting touched.

So, I'm probably looking in the wrong place. Where/How can I find these
values?

On Wed, May 9, 2012 at 11:09 AM, Jun Rao <jun...@gmail.com> wrote:

> I would try the test with numStreams=1 and see what happens. Also, you may
> want to produce some new data after the test starts.
>
> If you still have problem, could you get the value of the following paths
> from ZK (bin/zookeeper-shell.sh) after you started your consumer test?
>
> /consumers/[group_id]/ids/[consumer_id]
>
> /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
>
> Jun
>
> On Wed, May 9, 2012 at 8:49 AM, lessonz <lessonz.leg...@gmail.com> wrote:
>
> > Okay, here is the amended code:
> >
> > import java.util.Calendar;
> > import java.util.Collections;
> > import java.util.List;
> > import java.util.Map;
> > import java.util.Properties;
> > import java.util.concurrent.ExecutorService;
> > import java.util.concurrent.Executors;
> > import java.util.logging.Logger;
> >
> > import kafka.consumer.Consumer;
> > import kafka.consumer.ConsumerConfig;
> > import kafka.consumer.KafkaMessageStream;
> > import kafka.javaapi.consumer.ConsumerConnector;
> > import kafka.message.Message;
> > import kafka.serializer.DefaultDecoder;
> >
> > public class KafkaTestConsumer {
> >
> >    /**
> >     * @param args
> >     */
> >    public static void main(final String[] args) {
> >         try {
> >             final Logger logger = Logger.getLogger("KafkaTestConsumer");
> >
> >            // specify some consumer properties
> >            final Properties props = new Properties();
> >            props.put("zk.connect", "testserver:2181");
> >            props.put("zk.connectiontimeout.ms", "1000000");
> >            props.put("groupid", "test_group");
> >
> >            // Create the connection to the cluster
> >            final ConsumerConfig consumerConfig = new
> ConsumerConfig(props);
> >            final ConsumerConnector consumerConnector =
> > Consumer.createJavaConsumerConnector(consumerConfig);
> >
> >            // create 4 partitions of the stream for topic “testTopic”, to
> > allow
> >             // 4
> >             // threads to consume
> >            final String topicName = "testTopic";
> >            final int numStreams = 4;
> >            List<KafkaMessageStream<Message>> streams = null;
> >            try {
> >                final Map<String, List<KafkaMessageStream<Message>>>
> > topicMessageStreams = consumerConnector
> >
> > .createMessageStreams(Collections.singletonMap(topicName, numStreams),
> new
> > DefaultDecoder());
> >                streams = topicMessageStreams.get(topicName);
> >            } catch (final Exception e) {
> >                logger.severe(e.getMessage());
> >            }
> >
> >            // create list of 4 threads to consume from each of the
> > partitions
> >            final ExecutorService executor =
> > Executors.newFixedThreadPool(numStreams);
> >
> >            // consume the messages in the threads
> >            for (final KafkaMessageStream<Message> stream : streams) {
> >                executor.submit(new Runnable() {
> >                    @Override
> >                    public void run() {
> >                         try {
> >                            while (true) {
> >
> > logger.severe(Calendar.getInstance().getTime().toString());
> >                                if (stream == null) {
> >                                    logger.severe("stream is NULL.");
> >                                } else {
> >                                    logger.severe("stream = " + stream);
> >                                     for (final Message message : stream)
> {
> >                                        logger.severe("!");
> >                                        logger.severe(message.toString());
> >                                    }
> >                                }
> >                            }
> >                         } catch (final Throwable t) {
> >                            logger.severe("In run " + t.getMessage());
> >                        } finally {
> >                            if (stream == null) {
> >                                logger.severe("stream is NULL.");
> >                            } else {
> >                                logger.severe("stream = " + stream);
> >                            }
> >                        }
> >                    }
> >                });
> >            }
> >        } catch (final Throwable t) {
> >            System.err.println("In main" + t.getMessage());
> >        }
> >    }
> >
> > }
> >
> > Interesting things happen. I get the time printed out only once for each
> > stream. If I use eclipse's debugging and a breakpoint on the line "if
> > (stream == null) {" in the "while (true) {" loop, the variable stream
> says
> > "<error(s)_during_the_evaluation>" for value. If I step over this line,
> I'm
> > taken into the else clause, but the logger is never executed, and seems
> to
> > die when referencing the stream value. So, I got to thinking maybe the
> > problem is a missing dependency somewhere or maybe a conflict. So, here
> are
> > the dependencies I have in that project's pom (the project has other
> > pieces):
> >
> >    <dependencies>
> >        <!-- Import the JMS API, we use provided scope as the API is
> > included in
> >            JBoss AS 7 -->
> >        <dependency>
> >            <groupId>org.jboss.spec.javax.jms</groupId>
> >            <artifactId>jboss-jms-api_1.1_spec</artifactId>
> >            <scope>provided</scope>
> >        </dependency>
> >        <!-- Import the JCA API, we use provided scope as the API is
> > included in
> >            JBoss AS 7 -->
> >        <dependency>
> >            <groupId>org.jboss.spec.javax.resource</groupId>
> >            <artifactId>jboss-connector-api_1.6_spec</artifactId>
> >            <scope>provided</scope>
> >        </dependency>
> >        <dependency>
> >            <groupId>org.apache.incubator.kafka</groupId>
> >            <artifactId>kafka</artifactId>
> >            <version>0.7.0-incubating</version>
> >        </dependency>
> >    </dependencies>
> >
> > And here is the pom I'm using for kafka:
> >
> > <?xml version="1.0" encoding="UTF-8"?>
> > <project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> > http://maven.apache.org/xsd/maven-4.0.0.xsd"; xmlns="
> > http://maven.apache.org/POM/4.0.0"; xmlns:xsi="
> > http://www.w3.org/2001/XMLSchema-instance";>
> >    <modelVersion>4.0.0</modelVersion>
> >    <groupId>org.apache.incubator.kafka</groupId>
> >    <artifactId>kafka</artifactId>
> >    <packaging>pom</packaging>
> >    <version>0.7.0-incubating</version>
> >    <name>Apache Kafka</name>
> >    <description>Apache Kafka is a distributed publish-subscribe messaging
> > system</description>
> >    <url>http://incubator.apache.org/kafka</url>
> >    <inceptionYear>2012</inceptionYear>
> >    <licenses>
> >        <license>
> >            <name>The Apache Software License, Version 2.0</name>
> >            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
> >        </license>
> >    </licenses>
> >    <dependencies>
> >       <dependency>
> >            <groupId>org.scala-lang</groupId>
> >            <artifactId>scala-library</artifactId>
> >            <version>2.8.0</version>
> >       </dependency>
> >       <dependency>
> >            <groupId>com.github.sgroschupf</groupId>
> >            <artifactId>zkclient</artifactId>
> >            <version>0.1</version>
> >       </dependency>
> >    </dependencies>
> >    <scm>
> >        <connection>scm:svn:
> > http://svn.apache.org/repos/asf/incubator/kafka/trunk</connection>
> >        <developerConnection>scm:svn:
> > http://svn.apache.org/repos/asf/incubator/kafka/trunk
> > </developerConnection>
> >        <url>http://svn.apache.org/repos/asf/incubator/kafka/trunk</url>
> >    </scm>
> > </project>
> >
> > Again, any input would be greatly appreciated. Thanks.
> >
> > On Tue, May 8, 2012 at 6:34 PM, Jun Rao <jun...@gmail.com> wrote:
> >
> > > Could you put consumer run() code in try/catch and log all throwables?
> > > Executors can eat exceptions.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Tue, May 8, 2012 at 4:08 PM, lessonz <lessonz.leg...@gmail.com>
> > wrote:
> > >
> > > > After trying and failing to get a more complicated consumer working,
> I
> > > > decided to start at square one and use the example code. Below is my
> > > barely
> > > > modified implementation:
> > > >
> > > > import java.util.Collections;
> > > > import java.util.List;
> > > > import java.util.Map;
> > > > import java.util.Properties;
> > > > import java.util.concurrent.ExecutorService;
> > > > import java.util.concurrent.Executors;
> > > > import java.util.logging.Logger;
> > > >
> > > > import kafka.consumer.Consumer;
> > > > import kafka.consumer.ConsumerConfig;
> > > > import kafka.consumer.KafkaMessageStream;
> > > > import kafka.javaapi.consumer.ConsumerConnector;
> > > > import kafka.message.Message;
> > > > import kafka.serializer.DefaultDecoder;
> > > >
> > > > public class KafkaTestConsumer {
> > > >
> > > >    /**
> > > >     * @param args
> > > >     */
> > > >    public static void main(final String[] args) {
> > > >        final Logger logger = Logger.getLogger("KafkaTestConsumer");
> > > >
> > > >        // specify some consumer properties
> > > >        final Properties props = new Properties();
> > > >        props.put("zk.connect", "testserver:2181");
> > > >        props.put("zk.connectiontimeout.ms", "1000000");
> > > >        props.put("groupid", "test_group");
> > > >
> > > >        // Create the connection to the cluster
> > > >        final ConsumerConfig consumerConfig = new
> ConsumerConfig(props);
> > > >        final ConsumerConnector consumerConnector =
> > > > Consumer.createJavaConsumerConnector(consumerConfig);
> > > >
> > > >        // create 4 partitions of the stream for topic “testTopic”, to
> > > > allow 4
> > > >        // threads to consume
> > > >        final String topicName = "testTopic";
> > > >        final int numStreams = 4;
> > > >        List<KafkaMessageStream<Message>> streams = null;
> > > >        try {
> > > >            final Map<String, List<KafkaMessageStream<Message>>>
> > > > topicMessageStreams = consumerConnector
> > > >
> > > > .createMessageStreams(Collections.singletonMap(topicName,
> numStreams),
> > > new
> > > > DefaultDecoder());
> > > >            streams = topicMessageStreams.get(topicName);
> > > >        } catch (final Exception e) {
> > > >            logger.severe(e.getMessage());
> > > >        }
> > > >
> > > >        // create list of 4 threads to consume from each of the
> > partitions
> > > >        final ExecutorService executor =
> > > > Executors.newFixedThreadPool(numStreams);
> > > >
> > > >        // consume the messages in the threads
> > > >        for (final KafkaMessageStream<Message> stream : streams) {
> > > >            executor.submit(new Runnable() {
> > > >                @Override
> > > >                public void run() {
> > > >                    for (final Message message : stream) {
> > > >                        logger.severe("!");
> > > >                        logger.severe(message.toString());
> > > >                    }
> > > >                }
> > > >            });
> > > >        }
> > > >    }
> > > >
> > > > }
> > > >
> > > > It runs, I get no errors, and nothing happens. I don't get any
> > messages.
> > > I
> > > > don't THINK it's an issue with my Kafka install for two reasons: 1.
> > > > Zookeeper logs my client connection. 2. (Granted it's all on
> localhost
> > > but)
> > > > When I used the console consumer and producer on the instance, they
> > seem
> > > to
> > > > work just fine.
> > > >
> > > > Methodology is to start Zookeeper, start Kafka, start above
> > application,
> > > > and then connect a console produce to generate messages. I'm really
> at
> > a
> > > > loss as to what's happening. Interestingly, if I put in breakpoints,
> I
> > > seem
> > > > to lose a handle as I eventually lose the ability to step over, step
> > > into,
> > > > and so on.
> > > >
> > > > I'd really appreciate any input.
> > > >
> > > > Cheers.
> > > >
> > >
> >
>

Reply via email to