Well, that makes more sense. Sorry about that. Okay, so here are the results (I subbed in the [client machine name] for the actual value):
ls /consumers/test_group/ids [test_group_[client machine name]-1336664541413-4179e37b] get /consumers/test_group/ids/test_group_[client machine name]-1336664541413-4179e37b { "testTopic": 1 } cZxid = 0xd6 ctime = Thu May 10 09:39:39 MDT 2012 mZxid = 0xd6 mtime = Thu May 10 09:39:39 MDT 2012 pZxid = 0xd6 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x137376733c00002 dataLength = 18 numChildren = 0 and ls /consumers/test_group/offsets/testTopic [0-0] get /consumers/test_group/offsets/testTopic/0-0 -1 cZxid = 0x1d ctime = Wed May 09 13:29:24 MDT 2012 mZxid = 0x106 mtime = Thu May 10 09:47:18 MDT 2012 pZxid = 0x1d cversion = 0 dataVersion = 211 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 2 numChildren = 0 After sending some messages via the console producer, the only values that appear to change are (same values omitted): get /consumers/test_group/offsets/testTopic/0-0 mZxid = 0x11a mtime = Thu May 10 09:50:18 MDT 2012 dataVersion = 229 I'm not sure if that helps any, but I really appreciate your assistance. On Wed, May 9, 2012 at 3:08 PM, Jun Rao <jun...@gmail.com> wrote: > Run bin/zookeeper-shell.sh. This will bring up an interactive ZK shell. > Type ? to see the list of commands. You can do things like "ls > /consumers/[group_id]/ids" and get " > /consumers/[group_id]/ids/[consumer_id]". Do this while you consumer code > is running. > > Jun > > On Wed, May 9, 2012 at 12:53 PM, lessonz <lessonz.leg...@gmail.com> wrote: > > > I must apologize for my ignorance. I'm not sure where I should be looking > > for these values. The bin/zookeeper-shell.sh doesn't have them (I don't > > think it should). When I cleaned the log directory to start from > scratch, I > > get /tmp/zookeeper/version-2/log.1 and /tmp/zookeeper/version-2/. First > > they both appear to be binary, should they be? Since they're binary and > I'm > > using PuTTY, grepping is of limited value ( > > > http://www.chiark.greenend.org.uk/~sgtatham/putty/faq.html#faq-puttyputty > > ). > > The values you mentioned before definitely appear in log.1, but because > of > > the binary, I'm not seeing any assignment operators and trying to make > > heads or tails of it is currently beyond me. Also interesting, pretty > much > > immediately log.1 caps at 67108864 bytes and stays there. It's time stamp > > is being updated, so it's still getting touched. > > > > So, I'm probably looking in the wrong place. Where/How can I find these > > values? > > > > On Wed, May 9, 2012 at 11:09 AM, Jun Rao <jun...@gmail.com> wrote: > > > > > I would try the test with numStreams=1 and see what happens. Also, you > > may > > > want to produce some new data after the test starts. > > > > > > If you still have problem, could you get the value of the following > paths > > > from ZK (bin/zookeeper-shell.sh) after you started your consumer test? > > > > > > /consumers/[group_id]/ids/[consumer_id] > > > > > > /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] > > > > > > Jun > > > > > > On Wed, May 9, 2012 at 8:49 AM, lessonz <lessonz.leg...@gmail.com> > > wrote: > > > > > > > Okay, here is the amended code: > > > > > > > > import java.util.Calendar; > > > > import java.util.Collections; > > > > import java.util.List; > > > > import java.util.Map; > > > > import java.util.Properties; > > > > import java.util.concurrent.ExecutorService; > > > > import java.util.concurrent.Executors; > > > > import java.util.logging.Logger; > > > > > > > > import kafka.consumer.Consumer; > > > > import kafka.consumer.ConsumerConfig; > > > > import kafka.consumer.KafkaMessageStream; > > > > import kafka.javaapi.consumer.ConsumerConnector; > > > > import kafka.message.Message; > > > > import kafka.serializer.DefaultDecoder; > > > > > > > > public class KafkaTestConsumer { > > > > > > > > /** > > > > * @param args > > > > */ > > > > public static void main(final String[] args) { > > > > try { > > > > final Logger logger = > > Logger.getLogger("KafkaTestConsumer"); > > > > > > > > // specify some consumer properties > > > > final Properties props = new Properties(); > > > > props.put("zk.connect", "testserver:2181"); > > > > props.put("zk.connectiontimeout.ms", "1000000"); > > > > props.put("groupid", "test_group"); > > > > > > > > // Create the connection to the cluster > > > > final ConsumerConfig consumerConfig = new > > > ConsumerConfig(props); > > > > final ConsumerConnector consumerConnector = > > > > Consumer.createJavaConsumerConnector(consumerConfig); > > > > > > > > // create 4 partitions of the stream for topic > “testTopic”, > > to > > > > allow > > > > // 4 > > > > // threads to consume > > > > final String topicName = "testTopic"; > > > > final int numStreams = 4; > > > > List<KafkaMessageStream<Message>> streams = null; > > > > try { > > > > final Map<String, List<KafkaMessageStream<Message>>> > > > > topicMessageStreams = consumerConnector > > > > > > > > .createMessageStreams(Collections.singletonMap(topicName, > numStreams), > > > new > > > > DefaultDecoder()); > > > > streams = topicMessageStreams.get(topicName); > > > > } catch (final Exception e) { > > > > logger.severe(e.getMessage()); > > > > } > > > > > > > > // create list of 4 threads to consume from each of the > > > > partitions > > > > final ExecutorService executor = > > > > Executors.newFixedThreadPool(numStreams); > > > > > > > > // consume the messages in the threads > > > > for (final KafkaMessageStream<Message> stream : streams) { > > > > executor.submit(new Runnable() { > > > > @Override > > > > public void run() { > > > > try { > > > > while (true) { > > > > > > > > logger.severe(Calendar.getInstance().getTime().toString()); > > > > if (stream == null) { > > > > logger.severe("stream is NULL."); > > > > } else { > > > > logger.severe("stream = " + > stream); > > > > for (final Message message : > > stream) > > > { > > > > logger.severe("!"); > > > > > > logger.severe(message.toString()); > > > > } > > > > } > > > > } > > > > } catch (final Throwable t) { > > > > logger.severe("In run " + t.getMessage()); > > > > } finally { > > > > if (stream == null) { > > > > logger.severe("stream is NULL."); > > > > } else { > > > > logger.severe("stream = " + stream); > > > > } > > > > } > > > > } > > > > }); > > > > } > > > > } catch (final Throwable t) { > > > > System.err.println("In main" + t.getMessage()); > > > > } > > > > } > > > > > > > > } > > > > > > > > Interesting things happen. I get the time printed out only once for > > each > > > > stream. If I use eclipse's debugging and a breakpoint on the line "if > > > > (stream == null) {" in the "while (true) {" loop, the variable stream > > > says > > > > "<error(s)_during_the_evaluation>" for value. If I step over this > line, > > > I'm > > > > taken into the else clause, but the logger is never executed, and > seems > > > to > > > > die when referencing the stream value. So, I got to thinking maybe > the > > > > problem is a missing dependency somewhere or maybe a conflict. So, > here > > > are > > > > the dependencies I have in that project's pom (the project has other > > > > pieces): > > > > > > > > <dependencies> > > > > <!-- Import the JMS API, we use provided scope as the API is > > > > included in > > > > JBoss AS 7 --> > > > > <dependency> > > > > <groupId>org.jboss.spec.javax.jms</groupId> > > > > <artifactId>jboss-jms-api_1.1_spec</artifactId> > > > > <scope>provided</scope> > > > > </dependency> > > > > <!-- Import the JCA API, we use provided scope as the API is > > > > included in > > > > JBoss AS 7 --> > > > > <dependency> > > > > <groupId>org.jboss.spec.javax.resource</groupId> > > > > <artifactId>jboss-connector-api_1.6_spec</artifactId> > > > > <scope>provided</scope> > > > > </dependency> > > > > <dependency> > > > > <groupId>org.apache.incubator.kafka</groupId> > > > > <artifactId>kafka</artifactId> > > > > <version>0.7.0-incubating</version> > > > > </dependency> > > > > </dependencies> > > > > > > > > And here is the pom I'm using for kafka: > > > > > > > > <?xml version="1.0" encoding="UTF-8"?> > > > > <project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 > > > > http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns=" > > > > http://maven.apache.org/POM/4.0.0" xmlns:xsi=" > > > > http://www.w3.org/2001/XMLSchema-instance"> > > > > <modelVersion>4.0.0</modelVersion> > > > > <groupId>org.apache.incubator.kafka</groupId> > > > > <artifactId>kafka</artifactId> > > > > <packaging>pom</packaging> > > > > <version>0.7.0-incubating</version> > > > > <name>Apache Kafka</name> > > > > <description>Apache Kafka is a distributed publish-subscribe > > messaging > > > > system</description> > > > > <url>http://incubator.apache.org/kafka</url> > > > > <inceptionYear>2012</inceptionYear> > > > > <licenses> > > > > <license> > > > > <name>The Apache Software License, Version 2.0</name> > > > > <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> > > > > </license> > > > > </licenses> > > > > <dependencies> > > > > <dependency> > > > > <groupId>org.scala-lang</groupId> > > > > <artifactId>scala-library</artifactId> > > > > <version>2.8.0</version> > > > > </dependency> > > > > <dependency> > > > > <groupId>com.github.sgroschupf</groupId> > > > > <artifactId>zkclient</artifactId> > > > > <version>0.1</version> > > > > </dependency> > > > > </dependencies> > > > > <scm> > > > > <connection>scm:svn: > > > > http://svn.apache.org/repos/asf/incubator/kafka/trunk</connection> > > > > <developerConnection>scm:svn: > > > > http://svn.apache.org/repos/asf/incubator/kafka/trunk > > > > </developerConnection> > > > > <url>http://svn.apache.org/repos/asf/incubator/kafka/trunk > > </url> > > > > </scm> > > > > </project> > > > > > > > > Again, any input would be greatly appreciated. Thanks. > > > > > > > > On Tue, May 8, 2012 at 6:34 PM, Jun Rao <jun...@gmail.com> wrote: > > > > > > > > > Could you put consumer run() code in try/catch and log all > > throwables? > > > > > Executors can eat exceptions. > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > On Tue, May 8, 2012 at 4:08 PM, lessonz <lessonz.leg...@gmail.com> > > > > wrote: > > > > > > > > > > > After trying and failing to get a more complicated consumer > > working, > > > I > > > > > > decided to start at square one and use the example code. Below is > > my > > > > > barely > > > > > > modified implementation: > > > > > > > > > > > > import java.util.Collections; > > > > > > import java.util.List; > > > > > > import java.util.Map; > > > > > > import java.util.Properties; > > > > > > import java.util.concurrent.ExecutorService; > > > > > > import java.util.concurrent.Executors; > > > > > > import java.util.logging.Logger; > > > > > > > > > > > > import kafka.consumer.Consumer; > > > > > > import kafka.consumer.ConsumerConfig; > > > > > > import kafka.consumer.KafkaMessageStream; > > > > > > import kafka.javaapi.consumer.ConsumerConnector; > > > > > > import kafka.message.Message; > > > > > > import kafka.serializer.DefaultDecoder; > > > > > > > > > > > > public class KafkaTestConsumer { > > > > > > > > > > > > /** > > > > > > * @param args > > > > > > */ > > > > > > public static void main(final String[] args) { > > > > > > final Logger logger = > Logger.getLogger("KafkaTestConsumer"); > > > > > > > > > > > > // specify some consumer properties > > > > > > final Properties props = new Properties(); > > > > > > props.put("zk.connect", "testserver:2181"); > > > > > > props.put("zk.connectiontimeout.ms", "1000000"); > > > > > > props.put("groupid", "test_group"); > > > > > > > > > > > > // Create the connection to the cluster > > > > > > final ConsumerConfig consumerConfig = new > > > ConsumerConfig(props); > > > > > > final ConsumerConnector consumerConnector = > > > > > > Consumer.createJavaConsumerConnector(consumerConfig); > > > > > > > > > > > > // create 4 partitions of the stream for topic > “testTopic”, > > to > > > > > > allow 4 > > > > > > // threads to consume > > > > > > final String topicName = "testTopic"; > > > > > > final int numStreams = 4; > > > > > > List<KafkaMessageStream<Message>> streams = null; > > > > > > try { > > > > > > final Map<String, List<KafkaMessageStream<Message>>> > > > > > > topicMessageStreams = consumerConnector > > > > > > > > > > > > .createMessageStreams(Collections.singletonMap(topicName, > > > numStreams), > > > > > new > > > > > > DefaultDecoder()); > > > > > > streams = topicMessageStreams.get(topicName); > > > > > > } catch (final Exception e) { > > > > > > logger.severe(e.getMessage()); > > > > > > } > > > > > > > > > > > > // create list of 4 threads to consume from each of the > > > > partitions > > > > > > final ExecutorService executor = > > > > > > Executors.newFixedThreadPool(numStreams); > > > > > > > > > > > > // consume the messages in the threads > > > > > > for (final KafkaMessageStream<Message> stream : streams) { > > > > > > executor.submit(new Runnable() { > > > > > > @Override > > > > > > public void run() { > > > > > > for (final Message message : stream) { > > > > > > logger.severe("!"); > > > > > > logger.severe(message.toString()); > > > > > > } > > > > > > } > > > > > > }); > > > > > > } > > > > > > } > > > > > > > > > > > > } > > > > > > > > > > > > It runs, I get no errors, and nothing happens. I don't get any > > > > messages. > > > > > I > > > > > > don't THINK it's an issue with my Kafka install for two reasons: > 1. > > > > > > Zookeeper logs my client connection. 2. (Granted it's all on > > > localhost > > > > > but) > > > > > > When I used the console consumer and producer on the instance, > they > > > > seem > > > > > to > > > > > > work just fine. > > > > > > > > > > > > Methodology is to start Zookeeper, start Kafka, start above > > > > application, > > > > > > and then connect a console produce to generate messages. I'm > really > > > at > > > > a > > > > > > loss as to what's happening. Interestingly, if I put in > > breakpoints, > > > I > > > > > seem > > > > > > to lose a handle as I eventually lose the ability to step over, > > step > > > > > into, > > > > > > and so on. > > > > > > > > > > > > I'd really appreciate any input. > > > > > > > > > > > > Cheers. > > > > > > > > > > > > > > > > > > > > >