Okay, here's the current client code: import java.nio.ByteBuffer; import java.util.Calendar; import java.util.Collections; import java.util.Properties;
import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaMessageStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.Message; public class KafkaTestConsumer { /** * @param args */ public static void main(final String[] args) { try { // specify some consumer properties final Properties props = new Properties(); props.put("groupid", "test_group"); props.put("socket.buffer.size", Integer.toString(2 * 1024 * 1024)); props.put("fetch.size", Integer.toString(1024 * 1024)); props.put("auto.commit", "true"); props.put("autocommit.interval.ms", Integer.toString(10 * 1000)); props.put("autooffset.reset", "largest"); props.put("zk.connect", "testServer:2181"); // Create the connection to the cluster final ConsumerConfig consumerConfig = new ConsumerConfig(props); final ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); // create 4 partitions of the stream for topic “testTopic”, to allow // 4 // threads to consume final String topicName = "testTopic"; final int numStreams = 1; final KafkaMessageStream<Message> stream = consumerConnector .createMessageStreams(Collections.singletonMap(topicName, numStreams)).get(topicName).get(0); // final Thread thread = new Thread(new Runnable() { // @Override // public void run() { try { while (true) { System.err.println(Calendar.getInstance().getTime().toString()); for (final Message message : stream) { System.err.println("!"); final ByteBuffer buffer = message.payload(); final byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); System.err.println(new String(bytes)); } } } catch (final Throwable t) { System.err.println("In run " + t.getMessage()); } finally { if (stream == null) { System.err.println("stream is NULL."); } else { System.err.println("stream = " + stream); } } // } // }); // // thread.start(); } catch (final Throwable t) { System.err.println("In main " + t.getMessage()); } } } On launching, zookeeper logs: [2012-05-14 14:41:51,811] INFO Accepted socket connection from /[clientIP]:51723 (org.apache.zookeeper.server.NIOServerCnxn) [2012-05-14 14:41:51,815] INFO Client attempting to establish new session at /[clientIP]:51723 (org.apache.zookeeper.server.NIOServerCnxn) [2012-05-14 14:41:51,819] INFO Established session 0x1374d14f90a0003 with negotiated timeout 6000 for client /[clientIP]:51723 (org.apache.zookeeper.server.NIOServerCnxn) In the program itself, I get to the line "System.err.println(Calendar.getInstance().getTime().toString());" exactly once. Through trial and error, it seems like everything dies after attempting to reference the "stream" variable. In any event, I don't get any messages. Again, maybe I'm doing something wrong with regards to project configuration. Maybe I'm missing some upstream dependency. I don't know. So, then I decided to install Kafka on a second machine and simply try to connect its console consumer to rule out network issues. Well, I don't get any messages that way either. In the consumer, I see these two errors (with the info between them): [2012-05-14 14:24:15,310] ERROR error in earliestOrLatestOffset() (kafka.consumer.ZookeeperConsumerConnector) java.net.ConnectException: Connection refused at sun.nio.ch.Net.connect(Native Method) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500) at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:54) at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:193) at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:156) at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$earliestOrLatestOffset(ZookeeperConsumerConnector.scala:317) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.addPartitionTopicInfo(ZookeeperConsumerConnector.scala:564) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$processPartition(ZookeeperConsumerConnector.scala:548) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$1$$anonfun$apply$9$$anonfun$apply$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:507) at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:285) at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$1$$anonfun$apply$9.apply(ZookeeperConsumerConnector.scala:504) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$1$$anonfun$apply$9.apply(ZookeeperConsumerConnector.scala:491) at scala.collection.mutable.HashSet.foreach(HashSet.scala:61) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$1.apply(ZookeeperConsumerConnector.scala:491) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$1.apply(ZookeeperConsumerConnector.scala:477) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:477) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:437) at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282) at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:433) at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:202) at kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:111) at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:122) at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala) [2012-05-14 14:24:15,318] INFO Consumer console-consumer-47001_testServer-1337027055000-a01ec4c3 selected partitions : testTopic:0-0: fetched offset = -1: consumed offset = -1 (kafka.consumer.ZookeeperConsumerConnector) [2012-05-14 14:24:15,324] INFO end rebalancing consumer console-consumer-47001_testServer-1337027055000-a01ec4c3 try #0 (kafka.consumer.ZookeeperConsumerConnector) [2012-05-14 14:24:15,327] INFO FetchRunnable-0 start fetching topic: testTopic part: 0 offset: -1 from 127.0.0.1:9092(kafka.consumer.FetcherRunnable) [2012-05-14 14:24:15,328] ERROR error in FetcherRunnable (kafka.consumer.FetcherRunnable) java.net.ConnectException: Connection refused at sun.nio.ch.Net.connect(Native Method) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500) at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:54) at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:193) at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:120) at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:64) On the server to which I was attempting to connect, I get a bunch of these in zookeeper's output: [2012-05-14 14:33:14,498] INFO Got user-level KeeperException when processing sessionid:0x1374ceff0c0000f type:create cxid:0x12 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/consumers/console-consumer-48481/owners/testTopic Error:KeeperErrorCode = NoNode for /consumers/console-consumer-48481/owners/testTopic (org.apache.zookeeper.server.PrepRequestProcessor) The zookeeper output I actually also see when I connect a local copy of the console consumer. So, that may not be relevant as the local consumer works as expected. I'm really at a loss. Thanks again for your help. On Mon, May 14, 2012 at 9:03 AM, Jun Rao <jun...@gmail.com> wrote: > Maybe you can describe again the latest (potentially simplest) code that > you have and the problem that you still see. > > Thanks, > > Jun > > On Fri, May 11, 2012 at 3:47 PM, lessonz <lessonz.leg...@gmail.com> wrote: > > > Okay, I replaced every call to logger with System.err.println. Still same > > behavior. Obviously I must be doing something wrong, but for the life of > me > > I can't figure out what. > > > > On Fri, May 11, 2012 at 3:09 PM, Jun Rao <jun...@gmail.com> wrote: > > > > > How about using printf, instead of logger? > > > > > > Jun > > > > > > On Fri, May 11, 2012 at 1:40 PM, lessonz <lessonz.leg...@gmail.com> > > wrote: > > > > > > > Well, here is probably a better pom for Kafka: > > > > > > > > <?xml version="1.0" encoding="UTF-8"?> > > > > <project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 > > > > http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns=" > > > > http://maven.apache.org/POM/4.0.0" xmlns:xsi=" > > > > http://www.w3.org/2001/XMLSchema-instance"> > > > > <modelVersion>4.0.0</modelVersion> > > > > <groupId>org.apache.incubator.kafka</groupId> > > > > <artifactId>kafka</artifactId> > > > > <packaging>pom</packaging> > > > > <version>0.7.0-incubating</version> > > > > <name>Apache Kafka</name> > > > > <description>Apache Kafka is a distributed publish-subscribe > > messaging > > > > system</description> > > > > <url>http://incubator.apache.org/kafka</url> > > > > <inceptionYear>2012</inceptionYear> > > > > <licenses> > > > > <license> > > > > <name>The Apache Software License, Version 2.0</name> > > > > <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> > > > > </license> > > > > </licenses> > > > > <dependencies> > > > > <dependency> > > > > <groupId>org.scala-lang</groupId> > > > > <artifactId>scala-library</artifactId> > > > > <version>2.8.0</version> > > > > </dependency> > > > > <dependency> > > > > <groupId>org.apache.zookeeper</groupId> > > > > <artifactId>zookeeper</artifactId> > > > > <version>3.3.3</version> > > > > <scope>compile</scope> > > > > </dependency> > > > > <dependency> > > > > <groupId>net.sf.jopt-simple</groupId> > > > > <artifactId>jopt-simple</artifactId> > > > > <version>3.2</version> > > > > <scope>compile</scope> > > > > </dependency> > > > > <dependency> > > > > <groupId>log4j</groupId> > > > > <artifactId>log4j</artifactId> > > > > <version>1.2.15</version> > > > > <scope>compile</scope> > > > > </dependency> > > > > <dependency> > > > > <groupId>zkclient</groupId> > > > > <artifactId>zkclient</artifactId> > > > > <version>20110412</version> > > > > <scope>compile</scope> > > > > </dependency> > > > > </dependencies> > > > > <scm> > > > > <connection>scm:svn: > > > > http://svn.apache.org/repos/asf/incubator/kafka/trunk</connection> > > > > <developerConnection>scm:svn: > > > > http://svn.apache.org/repos/asf/incubator/kafka/trunk > > > > </developerConnection> > > > > <url>http://svn.apache.org/repos/asf/incubator/kafka/trunk > > </url> > > > > </scm> > > > > </project> > > > > > > > > Unfortunately, I still can't get my client to work. I'm at a complete > > > loss > > > > as for how to proceed. Can anyone else run the consumer code > presented > > on > > > > the quickstart page: > > https://incubator.apache.org/kafka/quickstart.html? > > > > > > > > On Fri, May 11, 2012 at 12:59 PM, lessonz <lessonz.leg...@gmail.com> > > > > wrote: > > > > > > > > > I realize message.toString() is not the way to access the payload, > > but > > > > > unless Scala changes this, in Java, if nothing else, toString > should > > > use > > > > > Object's implementation which should at least print an object > > > reference. > > > > In > > > > > any event, I modified the for-loop to: > > > > > > > > > > > > > > > for (final Message message : stream) { > > > > > logger.severe("!"); > > > > > final ByteBuffer buffer = > > > message.payload(); > > > > > final byte[] bytes = new > > > > > byte[buffer.remaining()]; > > > > > buffer.get(bytes); > > > > > logger.severe(new String(bytes)); > > > > > } > > > > > > > > > > And there was no change in behavior. I'm now trying to familiarize > > > myself > > > > > with sbt to try and figure out if I'm missing a dependency. Thanks. > > > > > > > > > > > > > > > On Fri, May 11, 2012 at 10:39 AM, Jun Rao <jun...@gmail.com> > wrote: > > > > > > > > > >> Also, message.toString() is not the right way to convert the > message > > > to > > > > >> String. You need to take the payload from the message and then > > convert > > > > the > > > > >> bytes to string. Take a look at > > > > >> examples/src/main/java/kafka/examples/ExampleUtils.java > > > > >> > > > > >> Jun > > > > >> > > > > >> On Fri, May 11, 2012 at 9:30 AM, lessonz < > lessonz.leg...@gmail.com> > > > > >> wrote: > > > > >> > > > > >> > Upon commenting out the logging output (the first and the > > identical > > > > one > > > > >> in > > > > >> > the finally clause), I'm still not getting anything from the > > server. > > > > I'm > > > > >> > assuming the sample client code works for others. That's why I'm > > > > >> worried my > > > > >> > problem might be more environmental, like a missing dependency. > > Has > > > > >> anyone > > > > >> > else tested the sample code? > > > > >> > > > > > >> > On Fri, May 11, 2012 at 10:13 AM, Jun Rao <jun...@gmail.com> > > wrote: > > > > >> > > > > > >> > > Try not to do logger.severe("stream = " + stream). I am not > sure > > > if > > > > a > > > > >> > > stream is printable. > > > > >> > > > > > > >> > > Jun > > > > >> > > > > > > >> > > On Fri, May 11, 2012 at 8:27 AM, lessonz < > > > lessonz.leg...@gmail.com> > > > > >> > wrote: > > > > >> > > > > > > >> > > > Okay, here's the newly amended code: > > > > >> > > > > > > > >> > > > import java.util.Calendar; > > > > >> > > > import java.util.Collections; > > > > >> > > > import java.util.List; > > > > >> > > > import java.util.Map; > > > > >> > > > import java.util.Properties; > > > > >> > > > import java.util.logging.Logger; > > > > >> > > > > > > > >> > > > import kafka.consumer.Consumer; > > > > >> > > > import kafka.consumer.ConsumerConfig; > > > > >> > > > import kafka.consumer.KafkaMessageStream; > > > > >> > > > import kafka.javaapi.consumer.ConsumerConnector; > > > > >> > > > import kafka.message.Message; > > > > >> > > > import kafka.serializer.DefaultDecoder; > > > > >> > > > > > > > >> > > > public class KafkaTestConsumer { > > > > >> > > > > > > > >> > > > /** > > > > >> > > > * @param args > > > > >> > > > */ > > > > >> > > > public static void main(final String[] args) { > > > > >> > > > final Logger logger = > > > > Logger.getLogger("KafkaTestConsumer"); > > > > >> > > > try { > > > > >> > > > > > > > >> > > > // specify some consumer properties > > > > >> > > > final Properties props = new Properties(); > > > > >> > > > props.put("zk.connect", "testServer:2181"); > > > > >> > > > props.put("zk.connectiontimeout.ms", > "1000000"); > > > > >> > > > props.put("groupid", "test_group"); > > > > >> > > > > > > > >> > > > // Create the connection to the cluster > > > > >> > > > final ConsumerConfig consumerConfig = new > > > > >> > > ConsumerConfig(props); > > > > >> > > > final ConsumerConnector consumerConnector = > > > > >> > > > Consumer.createJavaConsumerConnector(consumerConfig); > > > > >> > > > > > > > >> > > > // create 4 partitions of the stream for topic > > > > >> “testTopic”, > > > > >> > to > > > > >> > > > allow > > > > >> > > > // 4 > > > > >> > > > // threads to consume > > > > >> > > > final String topicName = "testTopic"; > > > > >> > > > final int numStreams = 1; > > > > >> > > > List<KafkaMessageStream<Message>> streams = null; > > > > >> > > > try { > > > > >> > > > final Map<String, > > > > List<KafkaMessageStream<Message>>> > > > > >> > > > topicMessageStreams = consumerConnector > > > > >> > > > > > > > >> > > > .createMessageStreams(Collections.singletonMap(topicName, > > > > >> numStreams), > > > > >> > > new > > > > >> > > > DefaultDecoder()); > > > > >> > > > streams = topicMessageStreams.get(topicName); > > > > >> > > > } catch (final Exception e) { > > > > >> > > > logger.severe(e.getMessage()); > > > > >> > > > } > > > > >> > > > > > > > >> > > > final KafkaMessageStream<Message> stream = > > > > >> streams.get(0); > > > > >> > > > > > > > >> > > > final Thread thread = new Thread(new Runnable() { > > > > >> > > > @Override > > > > >> > > > public void run() { > > > > >> > > > try { > > > > >> > > > while (true) { > > > > >> > > > > > > > >> > > > logger.severe(Calendar.getInstance().getTime().toString()); > > > > >> > > > if (stream == null) { > > > > >> > > > logger.severe("stream is > > NULL."); > > > > >> > > > } else { > > > > >> > > > logger.severe("stream = " + > > > > stream); > > > > >> > > > for (final Message message : > > > > stream) > > > > >> { > > > > >> > > > logger.severe("!"); > > > > >> > > > > > > > >> logger.severe(message.toString()); > > > > >> > > > } > > > > >> > > > } > > > > >> > > > } > > > > >> > > > } catch (final Throwable t) { > > > > >> > > > logger.severe("In run " + > > > t.getMessage()); > > > > >> > > > } finally { > > > > >> > > > if (stream == null) { > > > > >> > > > logger.severe("stream is NULL."); > > > > >> > > > } else { > > > > >> > > > logger.severe("stream = " + > > stream); > > > > >> > > > } > > > > >> > > > } > > > > >> > > > } > > > > >> > > > }); > > > > >> > > > > > > > >> > > > thread.start(); > > > > >> > > > } catch (final Throwable t) { > > > > >> > > > logger.severe("In main" + t.getMessage()); > > > > >> > > > } > > > > >> > > > } > > > > >> > > > } > > > > >> > > > > > > > >> > > > Behavior is identical to using the executor. I get the > logged > > > > time, > > > > >> but > > > > >> > > > nothing else. Using debugging stream is not null, but as > soon > > > as I > > > > >> try > > > > >> > to > > > > >> > > > reference it in the line "logger.severe("stream = " + > > stream);" > > > it > > > > >> > dies. > > > > >> > > I > > > > >> > > > don't get that output. So, it seems in both cases, my call > to > > > > >> > > > consumerConnector.createMessageStreams(...) is returning a > Map > > > of > > > > >> the > > > > >> > > right > > > > >> > > > size, but its contents appear to be corrupt. > > > > >> > > > > > > > >> > > > I have the source, but I really know nothing about Scala. In > > > > >> looking at > > > > >> > > > > > > > >> > > > kafka.javaapi.consumer.ZookeeperConsumerConnector#createMessageStreams, > > > > >> > > it > > > > >> > > > all seems pretty straightforward. I have to wonder again > about > > > > >> whether > > > > >> > > I'm > > > > >> > > > somehow throwing a silent class definition not found > > exception. > > > > For > > > > >> > > > dependencies I only have scala-library 2.8.0 and zkclient > 0.1. > > > Is > > > > >> that > > > > >> > > > really all that's needed? Are they the right versions? > > > > >> > > > > > > > >> > > > Again, thanks for your help. > > > > >> > > > > > > > >> > > > > > > > >> > > > On Thu, May 10, 2012 at 6:01 PM, Jun Rao <jun...@gmail.com> > > > > wrote: > > > > >> > > > > > > > >> > > > > Another thing that I would suggest is to not use Executors > > and > > > > >> start > > > > >> > > your > > > > >> > > > > own thread for consumption. > > > > >> > > > > > > > > >> > > > > Jun > > > > >> > > > > > > > > >> > > > > On Thu, May 10, 2012 at 12:14 PM, lessonz < > > > > >> lessonz.leg...@gmail.com> > > > > >> > > > > wrote: > > > > >> > > > > > > > > >> > > > > > Yes, get /consumers/test_group/offsets/ > > > > >> > > > > > testTopic/0-0 shows values changing. Neither > > > > >> > > > /consumers/[group_id]/ids/[ > > > > >> > > > > > consumer_id] nor /consumers/test_group/owners change any > > > > values. > > > > >> > > > > Hopefully > > > > >> > > > > > that tells us something? Again, thanks for all of your > > help. > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > On Thu, May 10, 2012 at 10:28 AM, Jun Rao < > > jun...@gmail.com > > > > > > > > >> > wrote: > > > > >> > > > > > > > > > >> > > > > > > So you are saying that the value in > > > > >> > > > > > > /consumers/test_group/offsets/testTopic/0-0 > > > > >> > > > > > > is moving? That typically means that the consumer is > > > > >> consuming. > > > > >> > > What > > > > >> > > > > > about > > > > >> > > > > > > the values of /consumers/test_group//ids and > > > > >> > > > > /consumers/test_group/owner? > > > > >> > > > > > > > > > > >> > > > > > > Jun > > > > >> > > > > > > > > > > >> > > > > > > On Thu, May 10, 2012 at 8:53 AM, lessonz < > > > > >> > lessonz.leg...@gmail.com > > > > >> > > > > > > > >> > > > > > wrote: > > > > >> > > > > > > > > > > >> > > > > > > > Well, that makes more sense. Sorry about that. > > > > >> > > > > > > > > > > > >> > > > > > > > Okay, so here are the results (I subbed in the > [client > > > > >> machine > > > > >> > > > name] > > > > >> > > > > > for > > > > >> > > > > > > > the actual value): > > > > >> > > > > > > > > > > > >> > > > > > > > ls /consumers/test_group/ids > > > > >> > > > > > > > [test_group_[client machine > > > name]-1336664541413-4179e37b] > > > > >> > > > > > > > get /consumers/test_group/ids/test_group_[client > > machine > > > > >> > > > > > > > name]-1336664541413-4179e37b > > > > >> > > > > > > > { "testTopic": 1 } > > > > >> > > > > > > > cZxid = 0xd6 > > > > >> > > > > > > > ctime = Thu May 10 09:39:39 MDT 2012 > > > > >> > > > > > > > mZxid = 0xd6 > > > > >> > > > > > > > mtime = Thu May 10 09:39:39 MDT 2012 > > > > >> > > > > > > > pZxid = 0xd6 > > > > >> > > > > > > > cversion = 0 > > > > >> > > > > > > > dataVersion = 0 > > > > >> > > > > > > > aclVersion = 0 > > > > >> > > > > > > > ephemeralOwner = 0x137376733c00002 > > > > >> > > > > > > > dataLength = 18 > > > > >> > > > > > > > numChildren = 0 > > > > >> > > > > > > > > > > > >> > > > > > > > and > > > > >> > > > > > > > > > > > >> > > > > > > > ls /consumers/test_group/offsets/testTopic > > > > >> > > > > > > > [0-0] > > > > >> > > > > > > > get /consumers/test_group/offsets/testTopic/0-0 > > > > >> > > > > > > > -1 > > > > >> > > > > > > > cZxid = 0x1d > > > > >> > > > > > > > ctime = Wed May 09 13:29:24 MDT 2012 > > > > >> > > > > > > > mZxid = 0x106 > > > > >> > > > > > > > mtime = Thu May 10 09:47:18 MDT 2012 > > > > >> > > > > > > > pZxid = 0x1d > > > > >> > > > > > > > cversion = 0 > > > > >> > > > > > > > dataVersion = 211 > > > > >> > > > > > > > aclVersion = 0 > > > > >> > > > > > > > ephemeralOwner = 0x0 > > > > >> > > > > > > > dataLength = 2 > > > > >> > > > > > > > numChildren = 0 > > > > >> > > > > > > > > > > > >> > > > > > > > After sending some messages via the console > producer, > > > the > > > > >> only > > > > >> > > > values > > > > >> > > > > > > that > > > > >> > > > > > > > appear to change are (same values omitted): > > > > >> > > > > > > > > > > > >> > > > > > > > get /consumers/test_group/offsets/testTopic/0-0 > > > > >> > > > > > > > mZxid = 0x11a > > > > >> > > > > > > > mtime = Thu May 10 09:50:18 MDT 2012 > > > > >> > > > > > > > dataVersion = 229 > > > > >> > > > > > > > > > > > >> > > > > > > > I'm not sure if that helps any, but I really > > appreciate > > > > your > > > > >> > > > > > assistance. > > > > >> > > > > > > > > > > > >> > > > > > > > On Wed, May 9, 2012 at 3:08 PM, Jun Rao < > > > jun...@gmail.com > > > > > > > > > >> > > wrote: > > > > >> > > > > > > > > > > > >> > > > > > > > > Run bin/zookeeper-shell.sh. This will bring up an > > > > >> interactive > > > > >> > > ZK > > > > >> > > > > > shell. > > > > >> > > > > > > > > Type ? to see the list of commands. You can do > > things > > > > like > > > > >> > "ls > > > > >> > > > > > > > > /consumers/[group_id]/ids" and get " > > > > >> > > > > > > > > /consumers/[group_id]/ids/[consumer_id]". Do this > > > while > > > > >> you > > > > >> > > > > consumer > > > > >> > > > > > > code > > > > >> > > > > > > > > is running. > > > > >> > > > > > > > > > > > > >> > > > > > > > > Jun > > > > >> > > > > > > > > > > > > >> > > > > > > > > On Wed, May 9, 2012 at 12:53 PM, lessonz < > > > > >> > > > lessonz.leg...@gmail.com > > > > >> > > > > > > > > > >> > > > > > > > wrote: > > > > >> > > > > > > > > > > > > >> > > > > > > > > > I must apologize for my ignorance. I'm not sure > > > where > > > > I > > > > >> > > should > > > > >> > > > be > > > > >> > > > > > > > looking > > > > >> > > > > > > > > > for these values. The bin/zookeeper-shell.sh > > doesn't > > > > >> have > > > > >> > > them > > > > >> > > > (I > > > > >> > > > > > > don't > > > > >> > > > > > > > > > think it should). When I cleaned the log > directory > > > to > > > > >> start > > > > >> > > > from > > > > >> > > > > > > > > scratch, I > > > > >> > > > > > > > > > get /tmp/zookeeper/version-2/log.1 and > > > > >> > > > /tmp/zookeeper/version-2/. > > > > >> > > > > > > First > > > > >> > > > > > > > > > they both appear to be binary, should they be? > > Since > > > > >> > they're > > > > >> > > > > binary > > > > >> > > > > > > and > > > > >> > > > > > > > > I'm > > > > >> > > > > > > > > > using PuTTY, grepping is of limited value ( > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > http://www.chiark.greenend.org.uk/~sgtatham/putty/faq.html#faq-puttyputty > > > > >> > > > > > > > > > ). > > > > >> > > > > > > > > > The values you mentioned before definitely > appear > > in > > > > >> log.1, > > > > >> > > but > > > > >> > > > > > > because > > > > >> > > > > > > > > of > > > > >> > > > > > > > > > the binary, I'm not seeing any assignment > > operators > > > > and > > > > >> > > trying > > > > >> > > > to > > > > >> > > > > > > make > > > > >> > > > > > > > > > heads or tails of it is currently beyond me. > Also > > > > >> > > interesting, > > > > >> > > > > > pretty > > > > >> > > > > > > > > much > > > > >> > > > > > > > > > immediately log.1 caps at 67108864 bytes and > stays > > > > >> there. > > > > >> > > It's > > > > >> > > > > time > > > > >> > > > > > > > stamp > > > > >> > > > > > > > > > is being updated, so it's still getting touched. > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > So, I'm probably looking in the wrong place. > > > Where/How > > > > >> can > > > > >> > I > > > > >> > > > find > > > > >> > > > > > > these > > > > >> > > > > > > > > > values? > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > On Wed, May 9, 2012 at 11:09 AM, Jun Rao < > > > > >> jun...@gmail.com > > > > >> > > > > > > >> > > > > wrote: > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > I would try the test with numStreams=1 and see > > > what > > > > >> > > happens. > > > > >> > > > > > Also, > > > > >> > > > > > > > you > > > > >> > > > > > > > > > may > > > > >> > > > > > > > > > > want to produce some new data after the test > > > starts. > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > If you still have problem, could you get the > > value > > > > of > > > > >> the > > > > >> > > > > > following > > > > >> > > > > > > > > paths > > > > >> > > > > > > > > > > from ZK (bin/zookeeper-shell.sh) after you > > started > > > > >> your > > > > >> > > > > consumer > > > > >> > > > > > > > test? > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > /consumers/[group_id]/ids/[consumer_id] > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > Jun > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > On Wed, May 9, 2012 at 8:49 AM, lessonz < > > > > >> > > > > > lessonz.leg...@gmail.com> > > > > >> > > > > > > > > > wrote: > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > Okay, here is the amended code: > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > import java.util.Calendar; > > > > >> > > > > > > > > > > > import java.util.Collections; > > > > >> > > > > > > > > > > > import java.util.List; > > > > >> > > > > > > > > > > > import java.util.Map; > > > > >> > > > > > > > > > > > import java.util.Properties; > > > > >> > > > > > > > > > > > import java.util.concurrent.ExecutorService; > > > > >> > > > > > > > > > > > import java.util.concurrent.Executors; > > > > >> > > > > > > > > > > > import java.util.logging.Logger; > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > import kafka.consumer.Consumer; > > > > >> > > > > > > > > > > > import kafka.consumer.ConsumerConfig; > > > > >> > > > > > > > > > > > import kafka.consumer.KafkaMessageStream; > > > > >> > > > > > > > > > > > import > > kafka.javaapi.consumer.ConsumerConnector; > > > > >> > > > > > > > > > > > import kafka.message.Message; > > > > >> > > > > > > > > > > > import kafka.serializer.DefaultDecoder; > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > public class KafkaTestConsumer { > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > /** > > > > >> > > > > > > > > > > > * @param args > > > > >> > > > > > > > > > > > */ > > > > >> > > > > > > > > > > > public static void main(final String[] > > args) > > > { > > > > >> > > > > > > > > > > > try { > > > > >> > > > > > > > > > > > final Logger logger = > > > > >> > > > > > > > > > Logger.getLogger("KafkaTestConsumer"); > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > // specify some consumer > properties > > > > >> > > > > > > > > > > > final Properties props = new > > > > >> Properties(); > > > > >> > > > > > > > > > > > props.put("zk.connect", > > > > >> "testserver:2181"); > > > > >> > > > > > > > > > > > props.put(" > zk.connectiontimeout.ms > > ", > > > > >> > > > "1000000"); > > > > >> > > > > > > > > > > > props.put("groupid", > "test_group"); > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > // Create the connection to the > > > cluster > > > > >> > > > > > > > > > > > final ConsumerConfig > > consumerConfig = > > > > new > > > > >> > > > > > > > > > > ConsumerConfig(props); > > > > >> > > > > > > > > > > > final ConsumerConnector > > > > >> consumerConnector = > > > > >> > > > > > > > > > > > > > > > >> Consumer.createJavaConsumerConnector(consumerConfig); > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > // create 4 partitions of the > > stream > > > > for > > > > >> > topic > > > > >> > > > > > > > > “testTopic”, > > > > >> > > > > > > > > > to > > > > >> > > > > > > > > > > > allow > > > > >> > > > > > > > > > > > // 4 > > > > >> > > > > > > > > > > > // threads to consume > > > > >> > > > > > > > > > > > final String topicName = > > "testTopic"; > > > > >> > > > > > > > > > > > final int numStreams = 4; > > > > >> > > > > > > > > > > > List<KafkaMessageStream<Message>> > > > > >> streams = > > > > >> > > > null; > > > > >> > > > > > > > > > > > try { > > > > >> > > > > > > > > > > > final Map<String, > > > > >> > > > > > > List<KafkaMessageStream<Message>>> > > > > >> > > > > > > > > > > > topicMessageStreams = consumerConnector > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > .createMessageStreams(Collections.singletonMap(topicName, > > > > >> > > > > > > > > numStreams), > > > > >> > > > > > > > > > > new > > > > >> > > > > > > > > > > > DefaultDecoder()); > > > > >> > > > > > > > > > > > streams = > > > > >> > > > topicMessageStreams.get(topicName); > > > > >> > > > > > > > > > > > } catch (final Exception e) { > > > > >> > > > > > > > > > > > > logger.severe(e.getMessage()); > > > > >> > > > > > > > > > > > } > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > // create list of 4 threads to > > > consume > > > > >> from > > > > >> > > each > > > > >> > > > > of > > > > >> > > > > > > the > > > > >> > > > > > > > > > > > partitions > > > > >> > > > > > > > > > > > final ExecutorService executor = > > > > >> > > > > > > > > > > > Executors.newFixedThreadPool(numStreams); > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > // consume the messages in the > > > threads > > > > >> > > > > > > > > > > > for (final > > > KafkaMessageStream<Message> > > > > >> > stream > > > > >> > > : > > > > >> > > > > > > > streams) { > > > > >> > > > > > > > > > > > executor.submit(new > Runnable() > > { > > > > >> > > > > > > > > > > > @Override > > > > >> > > > > > > > > > > > public void run() { > > > > >> > > > > > > > > > > > try { > > > > >> > > > > > > > > > > > while (true) { > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > logger.severe(Calendar.getInstance().getTime().toString()); > > > > >> > > > > > > > > > > > if (stream == > > > > null) { > > > > >> > > > > > > > > > > > > > > > >> > logger.severe("stream > > > > >> > > is > > > > >> > > > > > > > NULL."); > > > > >> > > > > > > > > > > > } else { > > > > >> > > > > > > > > > > > > > > > >> > logger.severe("stream > > > > >> > > = > > > > >> > > > " > > > > >> > > > > + > > > > >> > > > > > > > > stream); > > > > >> > > > > > > > > > > > for > (final > > > > >> Message > > > > >> > > > > message > > > > >> > > > > > : > > > > >> > > > > > > > > > stream) > > > > >> > > > > > > > > > > { > > > > >> > > > > > > > > > > > > > > > >> > > logger.severe("!"); > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > logger.severe(message.toString()); > > > > >> > > > > > > > > > > > } > > > > >> > > > > > > > > > > > } > > > > >> > > > > > > > > > > > } > > > > >> > > > > > > > > > > > } catch (final > > Throwable > > > > t) > > > > >> { > > > > >> > > > > > > > > > > > logger.severe("In > > run > > > > " + > > > > >> > > > > > > > t.getMessage()); > > > > >> > > > > > > > > > > > } finally { > > > > >> > > > > > > > > > > > if (stream == > > null) { > > > > >> > > > > > > > > > > > > > > > >> logger.severe("stream is > > > > >> > > > > > NULL."); > > > > >> > > > > > > > > > > > } else { > > > > >> > > > > > > > > > > > > > > > >> logger.severe("stream = > > > > >> > " > > > > >> > > + > > > > >> > > > > > > stream); > > > > >> > > > > > > > > > > > } > > > > >> > > > > > > > > > > > } > > > > >> > > > > > > > > > > > } > > > > >> > > > > > > > > > > > }); > > > > >> > > > > > > > > > > > } > > > > >> > > > > > > > > > > > } catch (final Throwable t) { > > > > >> > > > > > > > > > > > System.err.println("In main" + > > > > >> > > t.getMessage()); > > > > >> > > > > > > > > > > > } > > > > >> > > > > > > > > > > > } > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > } > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > Interesting things happen. I get the time > > > printed > > > > >> out > > > > >> > > only > > > > >> > > > > once > > > > >> > > > > > > for > > > > >> > > > > > > > > > each > > > > >> > > > > > > > > > > > stream. If I use eclipse's debugging and a > > > > >> breakpoint > > > > >> > on > > > > >> > > > the > > > > >> > > > > > line > > > > >> > > > > > > > "if > > > > >> > > > > > > > > > > > (stream == null) {" in the "while (true) {" > > > loop, > > > > >> the > > > > >> > > > > variable > > > > >> > > > > > > > stream > > > > >> > > > > > > > > > > says > > > > >> > > > > > > > > > > > "<error(s)_during_the_evaluation>" for > value. > > > If I > > > > >> step > > > > >> > > > over > > > > >> > > > > > this > > > > >> > > > > > > > > line, > > > > >> > > > > > > > > > > I'm > > > > >> > > > > > > > > > > > taken into the else clause, but the logger > is > > > > never > > > > >> > > > executed, > > > > >> > > > > > and > > > > >> > > > > > > > > seems > > > > >> > > > > > > > > > > to > > > > >> > > > > > > > > > > > die when referencing the stream value. So, I > > got > > > > to > > > > >> > > > thinking > > > > >> > > > > > > maybe > > > > >> > > > > > > > > the > > > > >> > > > > > > > > > > > problem is a missing dependency somewhere or > > > > maybe a > > > > >> > > > > conflict. > > > > >> > > > > > > So, > > > > >> > > > > > > > > here > > > > >> > > > > > > > > > > are > > > > >> > > > > > > > > > > > the dependencies I have in that project's > pom > > > (the > > > > >> > > project > > > > >> > > > > has > > > > >> > > > > > > > other > > > > >> > > > > > > > > > > > pieces): > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > <dependencies> > > > > >> > > > > > > > > > > > <!-- Import the JMS API, we use > > provided > > > > >> scope > > > > >> > as > > > > >> > > > the > > > > >> > > > > > API > > > > >> > > > > > > is > > > > >> > > > > > > > > > > > included in > > > > >> > > > > > > > > > > > JBoss AS 7 --> > > > > >> > > > > > > > > > > > <dependency> > > > > >> > > > > > > > > > > > > > > > >> <groupId>org.jboss.spec.javax.jms</groupId> > > > > >> > > > > > > > > > > > > > > > >> > > <artifactId>jboss-jms-api_1.1_spec</artifactId> > > > > >> > > > > > > > > > > > <scope>provided</scope> > > > > >> > > > > > > > > > > > </dependency> > > > > >> > > > > > > > > > > > <!-- Import the JCA API, we use > > provided > > > > >> scope > > > > >> > as > > > > >> > > > the > > > > >> > > > > > API > > > > >> > > > > > > is > > > > >> > > > > > > > > > > > included in > > > > >> > > > > > > > > > > > JBoss AS 7 --> > > > > >> > > > > > > > > > > > <dependency> > > > > >> > > > > > > > > > > > > > > > >> > > <groupId>org.jboss.spec.javax.resource</groupId> > > > > >> > > > > > > > > > > > > > > > >> > > > > > <artifactId>jboss-connector-api_1.6_spec</artifactId> > > > > >> > > > > > > > > > > > <scope>provided</scope> > > > > >> > > > > > > > > > > > </dependency> > > > > >> > > > > > > > > > > > <dependency> > > > > >> > > > > > > > > > > > > > > > >> > <groupId>org.apache.incubator.kafka</groupId> > > > > >> > > > > > > > > > > > <artifactId>kafka</artifactId> > > > > >> > > > > > > > > > > > > <version>0.7.0-incubating</version> > > > > >> > > > > > > > > > > > </dependency> > > > > >> > > > > > > > > > > > </dependencies> > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > And here is the pom I'm using for kafka: > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > <?xml version="1.0" encoding="UTF-8"?> > > > > >> > > > > > > > > > > > <project xsi:schemaLocation=" > > > > >> > > > > http://maven.apache.org/POM/4.0.0 > > > > >> > > > > > > > > > > > http://maven.apache.org/xsd/maven-4.0.0.xsd > " > > > > >> xmlns=" > > > > >> > > > > > > > > > > > http://maven.apache.org/POM/4.0.0" > > xmlns:xsi=" > > > > >> > > > > > > > > > > > http://www.w3.org/2001/XMLSchema-instance"> > > > > >> > > > > > > > > > > > <modelVersion>4.0.0</modelVersion> > > > > >> > > > > > > > > > > > > > <groupId>org.apache.incubator.kafka</groupId> > > > > >> > > > > > > > > > > > <artifactId>kafka</artifactId> > > > > >> > > > > > > > > > > > <packaging>pom</packaging> > > > > >> > > > > > > > > > > > <version>0.7.0-incubating</version> > > > > >> > > > > > > > > > > > <name>Apache Kafka</name> > > > > >> > > > > > > > > > > > <description>Apache Kafka is a > distributed > > > > >> > > > > publish-subscribe > > > > >> > > > > > > > > > messaging > > > > >> > > > > > > > > > > > system</description> > > > > >> > > > > > > > > > > > <url>http://incubator.apache.org/kafka > > </url> > > > > >> > > > > > > > > > > > <inceptionYear>2012</inceptionYear> > > > > >> > > > > > > > > > > > <licenses> > > > > >> > > > > > > > > > > > <license> > > > > >> > > > > > > > > > > > <name>The Apache Software > License, > > > > >> Version > > > > >> > > > > > 2.0</name> > > > > >> > > > > > > > > > > > <url> > > > > >> > > > > http://www.apache.org/licenses/LICENSE-2.0.txt > > > > >> > > > > > > > </url> > > > > >> > > > > > > > > > > > </license> > > > > >> > > > > > > > > > > > </licenses> > > > > >> > > > > > > > > > > > <dependencies> > > > > >> > > > > > > > > > > > <dependency> > > > > >> > > > > > > > > > > > <groupId>org.scala-lang</groupId> > > > > >> > > > > > > > > > > > > > > <artifactId>scala-library</artifactId> > > > > >> > > > > > > > > > > > <version>2.8.0</version> > > > > >> > > > > > > > > > > > </dependency> > > > > >> > > > > > > > > > > > <dependency> > > > > >> > > > > > > > > > > > > > > > <groupId>com.github.sgroschupf</groupId> > > > > >> > > > > > > > > > > > <artifactId>zkclient</artifactId> > > > > >> > > > > > > > > > > > <version>0.1</version> > > > > >> > > > > > > > > > > > </dependency> > > > > >> > > > > > > > > > > > </dependencies> > > > > >> > > > > > > > > > > > <scm> > > > > >> > > > > > > > > > > > <connection>scm:svn: > > > > >> > > > > > > > > > > > > > > > >> http://svn.apache.org/repos/asf/incubator/kafka/trunk > > > > >> > > > > > > </connection> > > > > >> > > > > > > > > > > > <developerConnection>scm:svn: > > > > >> > > > > > > > > > > > > > > > >> http://svn.apache.org/repos/asf/incubator/kafka/trunk > > > > >> > > > > > > > > > > > </developerConnection> > > > > >> > > > > > > > > > > > <url> > > > > >> > > > > > > http://svn.apache.org/repos/asf/incubator/kafka/trunk > > > > >> > > > > > > > > > </url> > > > > >> > > > > > > > > > > > </scm> > > > > >> > > > > > > > > > > > </project> > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > Again, any input would be greatly > appreciated. > > > > >> Thanks. > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > On Tue, May 8, 2012 at 6:34 PM, Jun Rao < > > > > >> > > jun...@gmail.com> > > > > >> > > > > > > wrote: > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > Could you put consumer run() code in > > try/catch > > > > and > > > > >> > log > > > > >> > > > all > > > > >> > > > > > > > > > throwables? > > > > >> > > > > > > > > > > > > Executors can eat exceptions. > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > Thanks, > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > Jun > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > On Tue, May 8, 2012 at 4:08 PM, lessonz < > > > > >> > > > > > > > lessonz.leg...@gmail.com> > > > > >> > > > > > > > > > > > wrote: > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > After trying and failing to get a more > > > > >> complicated > > > > >> > > > > consumer > > > > >> > > > > > > > > > working, > > > > >> > > > > > > > > > > I > > > > >> > > > > > > > > > > > > > decided to start at square one and use > the > > > > >> example > > > > >> > > > code. > > > > >> > > > > > > Below > > > > >> > > > > > > > is > > > > >> > > > > > > > > > my > > > > >> > > > > > > > > > > > > barely > > > > >> > > > > > > > > > > > > > modified implementation: > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > import java.util.Collections; > > > > >> > > > > > > > > > > > > > import java.util.List; > > > > >> > > > > > > > > > > > > > import java.util.Map; > > > > >> > > > > > > > > > > > > > import java.util.Properties; > > > > >> > > > > > > > > > > > > > import > > java.util.concurrent.ExecutorService; > > > > >> > > > > > > > > > > > > > import java.util.concurrent.Executors; > > > > >> > > > > > > > > > > > > > import java.util.logging.Logger; > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > import kafka.consumer.Consumer; > > > > >> > > > > > > > > > > > > > import kafka.consumer.ConsumerConfig; > > > > >> > > > > > > > > > > > > > import > kafka.consumer.KafkaMessageStream; > > > > >> > > > > > > > > > > > > > import > > > > kafka.javaapi.consumer.ConsumerConnector; > > > > >> > > > > > > > > > > > > > import kafka.message.Message; > > > > >> > > > > > > > > > > > > > import kafka.serializer.DefaultDecoder; > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > public class KafkaTestConsumer { > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > /** > > > > >> > > > > > > > > > > > > > * @param args > > > > >> > > > > > > > > > > > > > */ > > > > >> > > > > > > > > > > > > > public static void main(final > String[] > > > > args) > > > > >> { > > > > >> > > > > > > > > > > > > > final Logger logger = > > > > >> > > > > > > > > Logger.getLogger("KafkaTestConsumer"); > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > // specify some consumer > properties > > > > >> > > > > > > > > > > > > > final Properties props = new > > > > >> Properties(); > > > > >> > > > > > > > > > > > > > props.put("zk.connect", > > > > >> "testserver:2181"); > > > > >> > > > > > > > > > > > > > props.put(" > zk.connectiontimeout.ms > > ", > > > > >> > > > "1000000"); > > > > >> > > > > > > > > > > > > > props.put("groupid", > "test_group"); > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > // Create the connection to the > > > cluster > > > > >> > > > > > > > > > > > > > final ConsumerConfig > > consumerConfig = > > > > new > > > > >> > > > > > > > > > > ConsumerConfig(props); > > > > >> > > > > > > > > > > > > > final ConsumerConnector > > > > >> consumerConnector = > > > > >> > > > > > > > > > > > > > > > > > >> > Consumer.createJavaConsumerConnector(consumerConfig); > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > // create 4 partitions of the > > stream > > > > for > > > > >> > topic > > > > >> > > > > > > > > “testTopic”, > > > > >> > > > > > > > > > to > > > > >> > > > > > > > > > > > > > allow 4 > > > > >> > > > > > > > > > > > > > // threads to consume > > > > >> > > > > > > > > > > > > > final String topicName = > > "testTopic"; > > > > >> > > > > > > > > > > > > > final int numStreams = 4; > > > > >> > > > > > > > > > > > > > List<KafkaMessageStream<Message>> > > > > >> streams = > > > > >> > > > null; > > > > >> > > > > > > > > > > > > > try { > > > > >> > > > > > > > > > > > > > final Map<String, > > > > >> > > > > > > List<KafkaMessageStream<Message>>> > > > > >> > > > > > > > > > > > > > topicMessageStreams = consumerConnector > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > .createMessageStreams(Collections.singletonMap(topicName, > > > > >> > > > > > > > > > > numStreams), > > > > >> > > > > > > > > > > > > new > > > > >> > > > > > > > > > > > > > DefaultDecoder()); > > > > >> > > > > > > > > > > > > > streams = > > > > >> > > > topicMessageStreams.get(topicName); > > > > >> > > > > > > > > > > > > > } catch (final Exception e) { > > > > >> > > > > > > > > > > > > > > logger.severe(e.getMessage()); > > > > >> > > > > > > > > > > > > > } > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > // create list of 4 threads to > > > consume > > > > >> from > > > > >> > > each > > > > >> > > > > of > > > > >> > > > > > > the > > > > >> > > > > > > > > > > > partitions > > > > >> > > > > > > > > > > > > > final ExecutorService executor = > > > > >> > > > > > > > > > > > > > > Executors.newFixedThreadPool(numStreams); > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > // consume the messages in the > > > threads > > > > >> > > > > > > > > > > > > > for (final > > > KafkaMessageStream<Message> > > > > >> > stream > > > > >> > > : > > > > >> > > > > > > > streams) { > > > > >> > > > > > > > > > > > > > executor.submit(new > Runnable() > > { > > > > >> > > > > > > > > > > > > > @Override > > > > >> > > > > > > > > > > > > > public void run() { > > > > >> > > > > > > > > > > > > > for (final Message > > > message > > > > : > > > > >> > > > stream) { > > > > >> > > > > > > > > > > > > > > logger.severe("!"); > > > > >> > > > > > > > > > > > > > > > > > >> > > > logger.severe(message.toString()); > > > > >> > > > > > > > > > > > > > } > > > > >> > > > > > > > > > > > > > } > > > > >> > > > > > > > > > > > > > }); > > > > >> > > > > > > > > > > > > > } > > > > >> > > > > > > > > > > > > > } > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > } > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > It runs, I get no errors, and nothing > > > > happens. I > > > > >> > > don't > > > > >> > > > > get > > > > >> > > > > > > any > > > > >> > > > > > > > > > > > messages. > > > > >> > > > > > > > > > > > > I > > > > >> > > > > > > > > > > > > > don't THINK it's an issue with my Kafka > > > > install > > > > >> for > > > > >> > > two > > > > >> > > > > > > > reasons: > > > > >> > > > > > > > > 1. > > > > >> > > > > > > > > > > > > > Zookeeper logs my client connection. 2. > > > > (Granted > > > > >> > it's > > > > >> > > > all > > > > >> > > > > > on > > > > >> > > > > > > > > > > localhost > > > > >> > > > > > > > > > > > > but) > > > > >> > > > > > > > > > > > > > When I used the console consumer and > > > producer > > > > on > > > > >> > the > > > > >> > > > > > > instance, > > > > >> > > > > > > > > they > > > > >> > > > > > > > > > > > seem > > > > >> > > > > > > > > > > > > to > > > > >> > > > > > > > > > > > > > work just fine. > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > Methodology is to start Zookeeper, start > > > > Kafka, > > > > >> > start > > > > >> > > > > above > > > > >> > > > > > > > > > > > application, > > > > >> > > > > > > > > > > > > > and then connect a console produce to > > > generate > > > > >> > > > messages. > > > > >> > > > > > I'm > > > > >> > > > > > > > > really > > > > >> > > > > > > > > > > at > > > > >> > > > > > > > > > > > a > > > > >> > > > > > > > > > > > > > loss as to what's happening. > > Interestingly, > > > > if I > > > > >> > put > > > > >> > > in > > > > >> > > > > > > > > > breakpoints, > > > > >> > > > > > > > > > > I > > > > >> > > > > > > > > > > > > seem > > > > >> > > > > > > > > > > > > > to lose a handle as I eventually lose > the > > > > >> ability > > > > >> > to > > > > >> > > > step > > > > >> > > > > > > over, > > > > >> > > > > > > > > > step > > > > >> > > > > > > > > > > > > into, > > > > >> > > > > > > > > > > > > > and so on. > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > I'd really appreciate any input. > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > Cheers. > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > > >