How about using printf, instead of logger? Jun
On Fri, May 11, 2012 at 1:40 PM, lessonz <lessonz.leg...@gmail.com> wrote: > Well, here is probably a better pom for Kafka: > > <?xml version="1.0" encoding="UTF-8"?> > <project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 > http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns=" > http://maven.apache.org/POM/4.0.0" xmlns:xsi=" > http://www.w3.org/2001/XMLSchema-instance"> > <modelVersion>4.0.0</modelVersion> > <groupId>org.apache.incubator.kafka</groupId> > <artifactId>kafka</artifactId> > <packaging>pom</packaging> > <version>0.7.0-incubating</version> > <name>Apache Kafka</name> > <description>Apache Kafka is a distributed publish-subscribe messaging > system</description> > <url>http://incubator.apache.org/kafka</url> > <inceptionYear>2012</inceptionYear> > <licenses> > <license> > <name>The Apache Software License, Version 2.0</name> > <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> > </license> > </licenses> > <dependencies> > <dependency> > <groupId>org.scala-lang</groupId> > <artifactId>scala-library</artifactId> > <version>2.8.0</version> > </dependency> > <dependency> > <groupId>org.apache.zookeeper</groupId> > <artifactId>zookeeper</artifactId> > <version>3.3.3</version> > <scope>compile</scope> > </dependency> > <dependency> > <groupId>net.sf.jopt-simple</groupId> > <artifactId>jopt-simple</artifactId> > <version>3.2</version> > <scope>compile</scope> > </dependency> > <dependency> > <groupId>log4j</groupId> > <artifactId>log4j</artifactId> > <version>1.2.15</version> > <scope>compile</scope> > </dependency> > <dependency> > <groupId>zkclient</groupId> > <artifactId>zkclient</artifactId> > <version>20110412</version> > <scope>compile</scope> > </dependency> > </dependencies> > <scm> > <connection>scm:svn: > http://svn.apache.org/repos/asf/incubator/kafka/trunk</connection> > <developerConnection>scm:svn: > http://svn.apache.org/repos/asf/incubator/kafka/trunk > </developerConnection> > <url>http://svn.apache.org/repos/asf/incubator/kafka/trunk</url> > </scm> > </project> > > Unfortunately, I still can't get my client to work. I'm at a complete loss > as for how to proceed. Can anyone else run the consumer code presented on > the quickstart page: https://incubator.apache.org/kafka/quickstart.html ? > > On Fri, May 11, 2012 at 12:59 PM, lessonz <lessonz.leg...@gmail.com> > wrote: > > > I realize message.toString() is not the way to access the payload, but > > unless Scala changes this, in Java, if nothing else, toString should use > > Object's implementation which should at least print an object reference. > In > > any event, I modified the for-loop to: > > > > > > for (final Message message : stream) { > > logger.severe("!"); > > final ByteBuffer buffer = message.payload(); > > final byte[] bytes = new > > byte[buffer.remaining()]; > > buffer.get(bytes); > > logger.severe(new String(bytes)); > > } > > > > And there was no change in behavior. I'm now trying to familiarize myself > > with sbt to try and figure out if I'm missing a dependency. Thanks. > > > > > > On Fri, May 11, 2012 at 10:39 AM, Jun Rao <jun...@gmail.com> wrote: > > > >> Also, message.toString() is not the right way to convert the message to > >> String. You need to take the payload from the message and then convert > the > >> bytes to string. Take a look at > >> examples/src/main/java/kafka/examples/ExampleUtils.java > >> > >> Jun > >> > >> On Fri, May 11, 2012 at 9:30 AM, lessonz <lessonz.leg...@gmail.com> > >> wrote: > >> > >> > Upon commenting out the logging output (the first and the identical > one > >> in > >> > the finally clause), I'm still not getting anything from the server. > I'm > >> > assuming the sample client code works for others. That's why I'm > >> worried my > >> > problem might be more environmental, like a missing dependency. Has > >> anyone > >> > else tested the sample code? > >> > > >> > On Fri, May 11, 2012 at 10:13 AM, Jun Rao <jun...@gmail.com> wrote: > >> > > >> > > Try not to do logger.severe("stream = " + stream). I am not sure if > a > >> > > stream is printable. > >> > > > >> > > Jun > >> > > > >> > > On Fri, May 11, 2012 at 8:27 AM, lessonz <lessonz.leg...@gmail.com> > >> > wrote: > >> > > > >> > > > Okay, here's the newly amended code: > >> > > > > >> > > > import java.util.Calendar; > >> > > > import java.util.Collections; > >> > > > import java.util.List; > >> > > > import java.util.Map; > >> > > > import java.util.Properties; > >> > > > import java.util.logging.Logger; > >> > > > > >> > > > import kafka.consumer.Consumer; > >> > > > import kafka.consumer.ConsumerConfig; > >> > > > import kafka.consumer.KafkaMessageStream; > >> > > > import kafka.javaapi.consumer.ConsumerConnector; > >> > > > import kafka.message.Message; > >> > > > import kafka.serializer.DefaultDecoder; > >> > > > > >> > > > public class KafkaTestConsumer { > >> > > > > >> > > > /** > >> > > > * @param args > >> > > > */ > >> > > > public static void main(final String[] args) { > >> > > > final Logger logger = > Logger.getLogger("KafkaTestConsumer"); > >> > > > try { > >> > > > > >> > > > // specify some consumer properties > >> > > > final Properties props = new Properties(); > >> > > > props.put("zk.connect", "testServer:2181"); > >> > > > props.put("zk.connectiontimeout.ms", "1000000"); > >> > > > props.put("groupid", "test_group"); > >> > > > > >> > > > // Create the connection to the cluster > >> > > > final ConsumerConfig consumerConfig = new > >> > > ConsumerConfig(props); > >> > > > final ConsumerConnector consumerConnector = > >> > > > Consumer.createJavaConsumerConnector(consumerConfig); > >> > > > > >> > > > // create 4 partitions of the stream for topic > >> “testTopic”, > >> > to > >> > > > allow > >> > > > // 4 > >> > > > // threads to consume > >> > > > final String topicName = "testTopic"; > >> > > > final int numStreams = 1; > >> > > > List<KafkaMessageStream<Message>> streams = null; > >> > > > try { > >> > > > final Map<String, > List<KafkaMessageStream<Message>>> > >> > > > topicMessageStreams = consumerConnector > >> > > > > >> > > > .createMessageStreams(Collections.singletonMap(topicName, > >> numStreams), > >> > > new > >> > > > DefaultDecoder()); > >> > > > streams = topicMessageStreams.get(topicName); > >> > > > } catch (final Exception e) { > >> > > > logger.severe(e.getMessage()); > >> > > > } > >> > > > > >> > > > final KafkaMessageStream<Message> stream = > >> streams.get(0); > >> > > > > >> > > > final Thread thread = new Thread(new Runnable() { > >> > > > @Override > >> > > > public void run() { > >> > > > try { > >> > > > while (true) { > >> > > > > >> > > > logger.severe(Calendar.getInstance().getTime().toString()); > >> > > > if (stream == null) { > >> > > > logger.severe("stream is NULL."); > >> > > > } else { > >> > > > logger.severe("stream = " + > stream); > >> > > > for (final Message message : > stream) > >> { > >> > > > logger.severe("!"); > >> > > > > >> logger.severe(message.toString()); > >> > > > } > >> > > > } > >> > > > } > >> > > > } catch (final Throwable t) { > >> > > > logger.severe("In run " + t.getMessage()); > >> > > > } finally { > >> > > > if (stream == null) { > >> > > > logger.severe("stream is NULL."); > >> > > > } else { > >> > > > logger.severe("stream = " + stream); > >> > > > } > >> > > > } > >> > > > } > >> > > > }); > >> > > > > >> > > > thread.start(); > >> > > > } catch (final Throwable t) { > >> > > > logger.severe("In main" + t.getMessage()); > >> > > > } > >> > > > } > >> > > > } > >> > > > > >> > > > Behavior is identical to using the executor. I get the logged > time, > >> but > >> > > > nothing else. Using debugging stream is not null, but as soon as I > >> try > >> > to > >> > > > reference it in the line "logger.severe("stream = " + stream);" it > >> > dies. > >> > > I > >> > > > don't get that output. So, it seems in both cases, my call to > >> > > > consumerConnector.createMessageStreams(...) is returning a Map of > >> the > >> > > right > >> > > > size, but its contents appear to be corrupt. > >> > > > > >> > > > I have the source, but I really know nothing about Scala. In > >> looking at > >> > > > > >> kafka.javaapi.consumer.ZookeeperConsumerConnector#createMessageStreams, > >> > > it > >> > > > all seems pretty straightforward. I have to wonder again about > >> whether > >> > > I'm > >> > > > somehow throwing a silent class definition not found exception. > For > >> > > > dependencies I only have scala-library 2.8.0 and zkclient 0.1. Is > >> that > >> > > > really all that's needed? Are they the right versions? > >> > > > > >> > > > Again, thanks for your help. > >> > > > > >> > > > > >> > > > On Thu, May 10, 2012 at 6:01 PM, Jun Rao <jun...@gmail.com> > wrote: > >> > > > > >> > > > > Another thing that I would suggest is to not use Executors and > >> start > >> > > your > >> > > > > own thread for consumption. > >> > > > > > >> > > > > Jun > >> > > > > > >> > > > > On Thu, May 10, 2012 at 12:14 PM, lessonz < > >> lessonz.leg...@gmail.com> > >> > > > > wrote: > >> > > > > > >> > > > > > Yes, get /consumers/test_group/offsets/ > >> > > > > > testTopic/0-0 shows values changing. Neither > >> > > > /consumers/[group_id]/ids/[ > >> > > > > > consumer_id] nor /consumers/test_group/owners change any > values. > >> > > > > Hopefully > >> > > > > > that tells us something? Again, thanks for all of your help. > >> > > > > > > >> > > > > > > >> > > > > > On Thu, May 10, 2012 at 10:28 AM, Jun Rao <jun...@gmail.com> > >> > wrote: > >> > > > > > > >> > > > > > > So you are saying that the value in > >> > > > > > > /consumers/test_group/offsets/testTopic/0-0 > >> > > > > > > is moving? That typically means that the consumer is > >> consuming. > >> > > What > >> > > > > > about > >> > > > > > > the values of /consumers/test_group//ids and > >> > > > > /consumers/test_group/owner? > >> > > > > > > > >> > > > > > > Jun > >> > > > > > > > >> > > > > > > On Thu, May 10, 2012 at 8:53 AM, lessonz < > >> > lessonz.leg...@gmail.com > >> > > > > >> > > > > > wrote: > >> > > > > > > > >> > > > > > > > Well, that makes more sense. Sorry about that. > >> > > > > > > > > >> > > > > > > > Okay, so here are the results (I subbed in the [client > >> machine > >> > > > name] > >> > > > > > for > >> > > > > > > > the actual value): > >> > > > > > > > > >> > > > > > > > ls /consumers/test_group/ids > >> > > > > > > > [test_group_[client machine name]-1336664541413-4179e37b] > >> > > > > > > > get /consumers/test_group/ids/test_group_[client machine > >> > > > > > > > name]-1336664541413-4179e37b > >> > > > > > > > { "testTopic": 1 } > >> > > > > > > > cZxid = 0xd6 > >> > > > > > > > ctime = Thu May 10 09:39:39 MDT 2012 > >> > > > > > > > mZxid = 0xd6 > >> > > > > > > > mtime = Thu May 10 09:39:39 MDT 2012 > >> > > > > > > > pZxid = 0xd6 > >> > > > > > > > cversion = 0 > >> > > > > > > > dataVersion = 0 > >> > > > > > > > aclVersion = 0 > >> > > > > > > > ephemeralOwner = 0x137376733c00002 > >> > > > > > > > dataLength = 18 > >> > > > > > > > numChildren = 0 > >> > > > > > > > > >> > > > > > > > and > >> > > > > > > > > >> > > > > > > > ls /consumers/test_group/offsets/testTopic > >> > > > > > > > [0-0] > >> > > > > > > > get /consumers/test_group/offsets/testTopic/0-0 > >> > > > > > > > -1 > >> > > > > > > > cZxid = 0x1d > >> > > > > > > > ctime = Wed May 09 13:29:24 MDT 2012 > >> > > > > > > > mZxid = 0x106 > >> > > > > > > > mtime = Thu May 10 09:47:18 MDT 2012 > >> > > > > > > > pZxid = 0x1d > >> > > > > > > > cversion = 0 > >> > > > > > > > dataVersion = 211 > >> > > > > > > > aclVersion = 0 > >> > > > > > > > ephemeralOwner = 0x0 > >> > > > > > > > dataLength = 2 > >> > > > > > > > numChildren = 0 > >> > > > > > > > > >> > > > > > > > After sending some messages via the console producer, the > >> only > >> > > > values > >> > > > > > > that > >> > > > > > > > appear to change are (same values omitted): > >> > > > > > > > > >> > > > > > > > get /consumers/test_group/offsets/testTopic/0-0 > >> > > > > > > > mZxid = 0x11a > >> > > > > > > > mtime = Thu May 10 09:50:18 MDT 2012 > >> > > > > > > > dataVersion = 229 > >> > > > > > > > > >> > > > > > > > I'm not sure if that helps any, but I really appreciate > your > >> > > > > > assistance. > >> > > > > > > > > >> > > > > > > > On Wed, May 9, 2012 at 3:08 PM, Jun Rao <jun...@gmail.com > > > >> > > wrote: > >> > > > > > > > > >> > > > > > > > > Run bin/zookeeper-shell.sh. This will bring up an > >> interactive > >> > > ZK > >> > > > > > shell. > >> > > > > > > > > Type ? to see the list of commands. You can do things > like > >> > "ls > >> > > > > > > > > /consumers/[group_id]/ids" and get " > >> > > > > > > > > /consumers/[group_id]/ids/[consumer_id]". Do this while > >> you > >> > > > > consumer > >> > > > > > > code > >> > > > > > > > > is running. > >> > > > > > > > > > >> > > > > > > > > Jun > >> > > > > > > > > > >> > > > > > > > > On Wed, May 9, 2012 at 12:53 PM, lessonz < > >> > > > lessonz.leg...@gmail.com > >> > > > > > > >> > > > > > > > wrote: > >> > > > > > > > > > >> > > > > > > > > > I must apologize for my ignorance. I'm not sure where > I > >> > > should > >> > > > be > >> > > > > > > > looking > >> > > > > > > > > > for these values. The bin/zookeeper-shell.sh doesn't > >> have > >> > > them > >> > > > (I > >> > > > > > > don't > >> > > > > > > > > > think it should). When I cleaned the log directory to > >> start > >> > > > from > >> > > > > > > > > scratch, I > >> > > > > > > > > > get /tmp/zookeeper/version-2/log.1 and > >> > > > /tmp/zookeeper/version-2/. > >> > > > > > > First > >> > > > > > > > > > they both appear to be binary, should they be? Since > >> > they're > >> > > > > binary > >> > > > > > > and > >> > > > > > > > > I'm > >> > > > > > > > > > using PuTTY, grepping is of limited value ( > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > http://www.chiark.greenend.org.uk/~sgtatham/putty/faq.html#faq-puttyputty > >> > > > > > > > > > ). > >> > > > > > > > > > The values you mentioned before definitely appear in > >> log.1, > >> > > but > >> > > > > > > because > >> > > > > > > > > of > >> > > > > > > > > > the binary, I'm not seeing any assignment operators > and > >> > > trying > >> > > > to > >> > > > > > > make > >> > > > > > > > > > heads or tails of it is currently beyond me. Also > >> > > interesting, > >> > > > > > pretty > >> > > > > > > > > much > >> > > > > > > > > > immediately log.1 caps at 67108864 bytes and stays > >> there. > >> > > It's > >> > > > > time > >> > > > > > > > stamp > >> > > > > > > > > > is being updated, so it's still getting touched. > >> > > > > > > > > > > >> > > > > > > > > > So, I'm probably looking in the wrong place. Where/How > >> can > >> > I > >> > > > find > >> > > > > > > these > >> > > > > > > > > > values? > >> > > > > > > > > > > >> > > > > > > > > > On Wed, May 9, 2012 at 11:09 AM, Jun Rao < > >> jun...@gmail.com > >> > > > >> > > > > wrote: > >> > > > > > > > > > > >> > > > > > > > > > > I would try the test with numStreams=1 and see what > >> > > happens. > >> > > > > > Also, > >> > > > > > > > you > >> > > > > > > > > > may > >> > > > > > > > > > > want to produce some new data after the test starts. > >> > > > > > > > > > > > >> > > > > > > > > > > If you still have problem, could you get the value > of > >> the > >> > > > > > following > >> > > > > > > > > paths > >> > > > > > > > > > > from ZK (bin/zookeeper-shell.sh) after you started > >> your > >> > > > > consumer > >> > > > > > > > test? > >> > > > > > > > > > > > >> > > > > > > > > > > /consumers/[group_id]/ids/[consumer_id] > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] > >> > > > > > > > > > > > >> > > > > > > > > > > Jun > >> > > > > > > > > > > > >> > > > > > > > > > > On Wed, May 9, 2012 at 8:49 AM, lessonz < > >> > > > > > lessonz.leg...@gmail.com> > >> > > > > > > > > > wrote: > >> > > > > > > > > > > > >> > > > > > > > > > > > Okay, here is the amended code: > >> > > > > > > > > > > > > >> > > > > > > > > > > > import java.util.Calendar; > >> > > > > > > > > > > > import java.util.Collections; > >> > > > > > > > > > > > import java.util.List; > >> > > > > > > > > > > > import java.util.Map; > >> > > > > > > > > > > > import java.util.Properties; > >> > > > > > > > > > > > import java.util.concurrent.ExecutorService; > >> > > > > > > > > > > > import java.util.concurrent.Executors; > >> > > > > > > > > > > > import java.util.logging.Logger; > >> > > > > > > > > > > > > >> > > > > > > > > > > > import kafka.consumer.Consumer; > >> > > > > > > > > > > > import kafka.consumer.ConsumerConfig; > >> > > > > > > > > > > > import kafka.consumer.KafkaMessageStream; > >> > > > > > > > > > > > import kafka.javaapi.consumer.ConsumerConnector; > >> > > > > > > > > > > > import kafka.message.Message; > >> > > > > > > > > > > > import kafka.serializer.DefaultDecoder; > >> > > > > > > > > > > > > >> > > > > > > > > > > > public class KafkaTestConsumer { > >> > > > > > > > > > > > > >> > > > > > > > > > > > /** > >> > > > > > > > > > > > * @param args > >> > > > > > > > > > > > */ > >> > > > > > > > > > > > public static void main(final String[] args) { > >> > > > > > > > > > > > try { > >> > > > > > > > > > > > final Logger logger = > >> > > > > > > > > > Logger.getLogger("KafkaTestConsumer"); > >> > > > > > > > > > > > > >> > > > > > > > > > > > // specify some consumer properties > >> > > > > > > > > > > > final Properties props = new > >> Properties(); > >> > > > > > > > > > > > props.put("zk.connect", > >> "testserver:2181"); > >> > > > > > > > > > > > props.put("zk.connectiontimeout.ms", > >> > > > "1000000"); > >> > > > > > > > > > > > props.put("groupid", "test_group"); > >> > > > > > > > > > > > > >> > > > > > > > > > > > // Create the connection to the cluster > >> > > > > > > > > > > > final ConsumerConfig consumerConfig = > new > >> > > > > > > > > > > ConsumerConfig(props); > >> > > > > > > > > > > > final ConsumerConnector > >> consumerConnector = > >> > > > > > > > > > > > > >> Consumer.createJavaConsumerConnector(consumerConfig); > >> > > > > > > > > > > > > >> > > > > > > > > > > > // create 4 partitions of the stream > for > >> > topic > >> > > > > > > > > “testTopic”, > >> > > > > > > > > > to > >> > > > > > > > > > > > allow > >> > > > > > > > > > > > // 4 > >> > > > > > > > > > > > // threads to consume > >> > > > > > > > > > > > final String topicName = "testTopic"; > >> > > > > > > > > > > > final int numStreams = 4; > >> > > > > > > > > > > > List<KafkaMessageStream<Message>> > >> streams = > >> > > > null; > >> > > > > > > > > > > > try { > >> > > > > > > > > > > > final Map<String, > >> > > > > > > List<KafkaMessageStream<Message>>> > >> > > > > > > > > > > > topicMessageStreams = consumerConnector > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > .createMessageStreams(Collections.singletonMap(topicName, > >> > > > > > > > > numStreams), > >> > > > > > > > > > > new > >> > > > > > > > > > > > DefaultDecoder()); > >> > > > > > > > > > > > streams = > >> > > > topicMessageStreams.get(topicName); > >> > > > > > > > > > > > } catch (final Exception e) { > >> > > > > > > > > > > > logger.severe(e.getMessage()); > >> > > > > > > > > > > > } > >> > > > > > > > > > > > > >> > > > > > > > > > > > // create list of 4 threads to consume > >> from > >> > > each > >> > > > > of > >> > > > > > > the > >> > > > > > > > > > > > partitions > >> > > > > > > > > > > > final ExecutorService executor = > >> > > > > > > > > > > > Executors.newFixedThreadPool(numStreams); > >> > > > > > > > > > > > > >> > > > > > > > > > > > // consume the messages in the threads > >> > > > > > > > > > > > for (final KafkaMessageStream<Message> > >> > stream > >> > > : > >> > > > > > > > streams) { > >> > > > > > > > > > > > executor.submit(new Runnable() { > >> > > > > > > > > > > > @Override > >> > > > > > > > > > > > public void run() { > >> > > > > > > > > > > > try { > >> > > > > > > > > > > > while (true) { > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > logger.severe(Calendar.getInstance().getTime().toString()); > >> > > > > > > > > > > > if (stream == > null) { > >> > > > > > > > > > > > > >> > logger.severe("stream > >> > > is > >> > > > > > > > NULL."); > >> > > > > > > > > > > > } else { > >> > > > > > > > > > > > > >> > logger.severe("stream > >> > > = > >> > > > " > >> > > > > + > >> > > > > > > > > stream); > >> > > > > > > > > > > > for (final > >> Message > >> > > > > message > >> > > > > > : > >> > > > > > > > > > stream) > >> > > > > > > > > > > { > >> > > > > > > > > > > > > >> > > logger.severe("!"); > >> > > > > > > > > > > > > >> > > > > > > > > > logger.severe(message.toString()); > >> > > > > > > > > > > > } > >> > > > > > > > > > > > } > >> > > > > > > > > > > > } > >> > > > > > > > > > > > } catch (final Throwable > t) > >> { > >> > > > > > > > > > > > logger.severe("In run > " + > >> > > > > > > > t.getMessage()); > >> > > > > > > > > > > > } finally { > >> > > > > > > > > > > > if (stream == null) { > >> > > > > > > > > > > > > >> logger.severe("stream is > >> > > > > > NULL."); > >> > > > > > > > > > > > } else { > >> > > > > > > > > > > > > >> logger.severe("stream = > >> > " > >> > > + > >> > > > > > > stream); > >> > > > > > > > > > > > } > >> > > > > > > > > > > > } > >> > > > > > > > > > > > } > >> > > > > > > > > > > > }); > >> > > > > > > > > > > > } > >> > > > > > > > > > > > } catch (final Throwable t) { > >> > > > > > > > > > > > System.err.println("In main" + > >> > > t.getMessage()); > >> > > > > > > > > > > > } > >> > > > > > > > > > > > } > >> > > > > > > > > > > > > >> > > > > > > > > > > > } > >> > > > > > > > > > > > > >> > > > > > > > > > > > Interesting things happen. I get the time printed > >> out > >> > > only > >> > > > > once > >> > > > > > > for > >> > > > > > > > > > each > >> > > > > > > > > > > > stream. If I use eclipse's debugging and a > >> breakpoint > >> > on > >> > > > the > >> > > > > > line > >> > > > > > > > "if > >> > > > > > > > > > > > (stream == null) {" in the "while (true) {" loop, > >> the > >> > > > > variable > >> > > > > > > > stream > >> > > > > > > > > > > says > >> > > > > > > > > > > > "<error(s)_during_the_evaluation>" for value. If I > >> step > >> > > > over > >> > > > > > this > >> > > > > > > > > line, > >> > > > > > > > > > > I'm > >> > > > > > > > > > > > taken into the else clause, but the logger is > never > >> > > > executed, > >> > > > > > and > >> > > > > > > > > seems > >> > > > > > > > > > > to > >> > > > > > > > > > > > die when referencing the stream value. So, I got > to > >> > > > thinking > >> > > > > > > maybe > >> > > > > > > > > the > >> > > > > > > > > > > > problem is a missing dependency somewhere or > maybe a > >> > > > > conflict. > >> > > > > > > So, > >> > > > > > > > > here > >> > > > > > > > > > > are > >> > > > > > > > > > > > the dependencies I have in that project's pom (the > >> > > project > >> > > > > has > >> > > > > > > > other > >> > > > > > > > > > > > pieces): > >> > > > > > > > > > > > > >> > > > > > > > > > > > <dependencies> > >> > > > > > > > > > > > <!-- Import the JMS API, we use provided > >> scope > >> > as > >> > > > the > >> > > > > > API > >> > > > > > > is > >> > > > > > > > > > > > included in > >> > > > > > > > > > > > JBoss AS 7 --> > >> > > > > > > > > > > > <dependency> > >> > > > > > > > > > > > > >> <groupId>org.jboss.spec.javax.jms</groupId> > >> > > > > > > > > > > > > >> > > <artifactId>jboss-jms-api_1.1_spec</artifactId> > >> > > > > > > > > > > > <scope>provided</scope> > >> > > > > > > > > > > > </dependency> > >> > > > > > > > > > > > <!-- Import the JCA API, we use provided > >> scope > >> > as > >> > > > the > >> > > > > > API > >> > > > > > > is > >> > > > > > > > > > > > included in > >> > > > > > > > > > > > JBoss AS 7 --> > >> > > > > > > > > > > > <dependency> > >> > > > > > > > > > > > > >> > > <groupId>org.jboss.spec.javax.resource</groupId> > >> > > > > > > > > > > > > >> > > > > > <artifactId>jboss-connector-api_1.6_spec</artifactId> > >> > > > > > > > > > > > <scope>provided</scope> > >> > > > > > > > > > > > </dependency> > >> > > > > > > > > > > > <dependency> > >> > > > > > > > > > > > > >> > <groupId>org.apache.incubator.kafka</groupId> > >> > > > > > > > > > > > <artifactId>kafka</artifactId> > >> > > > > > > > > > > > <version>0.7.0-incubating</version> > >> > > > > > > > > > > > </dependency> > >> > > > > > > > > > > > </dependencies> > >> > > > > > > > > > > > > >> > > > > > > > > > > > And here is the pom I'm using for kafka: > >> > > > > > > > > > > > > >> > > > > > > > > > > > <?xml version="1.0" encoding="UTF-8"?> > >> > > > > > > > > > > > <project xsi:schemaLocation=" > >> > > > > http://maven.apache.org/POM/4.0.0 > >> > > > > > > > > > > > http://maven.apache.org/xsd/maven-4.0.0.xsd" > >> xmlns=" > >> > > > > > > > > > > > http://maven.apache.org/POM/4.0.0" xmlns:xsi=" > >> > > > > > > > > > > > http://www.w3.org/2001/XMLSchema-instance"> > >> > > > > > > > > > > > <modelVersion>4.0.0</modelVersion> > >> > > > > > > > > > > > <groupId>org.apache.incubator.kafka</groupId> > >> > > > > > > > > > > > <artifactId>kafka</artifactId> > >> > > > > > > > > > > > <packaging>pom</packaging> > >> > > > > > > > > > > > <version>0.7.0-incubating</version> > >> > > > > > > > > > > > <name>Apache Kafka</name> > >> > > > > > > > > > > > <description>Apache Kafka is a distributed > >> > > > > publish-subscribe > >> > > > > > > > > > messaging > >> > > > > > > > > > > > system</description> > >> > > > > > > > > > > > <url>http://incubator.apache.org/kafka</url> > >> > > > > > > > > > > > <inceptionYear>2012</inceptionYear> > >> > > > > > > > > > > > <licenses> > >> > > > > > > > > > > > <license> > >> > > > > > > > > > > > <name>The Apache Software License, > >> Version > >> > > > > > 2.0</name> > >> > > > > > > > > > > > <url> > >> > > > > http://www.apache.org/licenses/LICENSE-2.0.txt > >> > > > > > > > </url> > >> > > > > > > > > > > > </license> > >> > > > > > > > > > > > </licenses> > >> > > > > > > > > > > > <dependencies> > >> > > > > > > > > > > > <dependency> > >> > > > > > > > > > > > <groupId>org.scala-lang</groupId> > >> > > > > > > > > > > > <artifactId>scala-library</artifactId> > >> > > > > > > > > > > > <version>2.8.0</version> > >> > > > > > > > > > > > </dependency> > >> > > > > > > > > > > > <dependency> > >> > > > > > > > > > > > > <groupId>com.github.sgroschupf</groupId> > >> > > > > > > > > > > > <artifactId>zkclient</artifactId> > >> > > > > > > > > > > > <version>0.1</version> > >> > > > > > > > > > > > </dependency> > >> > > > > > > > > > > > </dependencies> > >> > > > > > > > > > > > <scm> > >> > > > > > > > > > > > <connection>scm:svn: > >> > > > > > > > > > > > > >> http://svn.apache.org/repos/asf/incubator/kafka/trunk > >> > > > > > > </connection> > >> > > > > > > > > > > > <developerConnection>scm:svn: > >> > > > > > > > > > > > > >> http://svn.apache.org/repos/asf/incubator/kafka/trunk > >> > > > > > > > > > > > </developerConnection> > >> > > > > > > > > > > > <url> > >> > > > > > > http://svn.apache.org/repos/asf/incubator/kafka/trunk > >> > > > > > > > > > </url> > >> > > > > > > > > > > > </scm> > >> > > > > > > > > > > > </project> > >> > > > > > > > > > > > > >> > > > > > > > > > > > Again, any input would be greatly appreciated. > >> Thanks. > >> > > > > > > > > > > > > >> > > > > > > > > > > > On Tue, May 8, 2012 at 6:34 PM, Jun Rao < > >> > > jun...@gmail.com> > >> > > > > > > wrote: > >> > > > > > > > > > > > > >> > > > > > > > > > > > > Could you put consumer run() code in try/catch > and > >> > log > >> > > > all > >> > > > > > > > > > throwables? > >> > > > > > > > > > > > > Executors can eat exceptions. > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > Thanks, > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > Jun > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > On Tue, May 8, 2012 at 4:08 PM, lessonz < > >> > > > > > > > lessonz.leg...@gmail.com> > >> > > > > > > > > > > > wrote: > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > After trying and failing to get a more > >> complicated > >> > > > > consumer > >> > > > > > > > > > working, > >> > > > > > > > > > > I > >> > > > > > > > > > > > > > decided to start at square one and use the > >> example > >> > > > code. > >> > > > > > > Below > >> > > > > > > > is > >> > > > > > > > > > my > >> > > > > > > > > > > > > barely > >> > > > > > > > > > > > > > modified implementation: > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > import java.util.Collections; > >> > > > > > > > > > > > > > import java.util.List; > >> > > > > > > > > > > > > > import java.util.Map; > >> > > > > > > > > > > > > > import java.util.Properties; > >> > > > > > > > > > > > > > import java.util.concurrent.ExecutorService; > >> > > > > > > > > > > > > > import java.util.concurrent.Executors; > >> > > > > > > > > > > > > > import java.util.logging.Logger; > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > import kafka.consumer.Consumer; > >> > > > > > > > > > > > > > import kafka.consumer.ConsumerConfig; > >> > > > > > > > > > > > > > import kafka.consumer.KafkaMessageStream; > >> > > > > > > > > > > > > > import > kafka.javaapi.consumer.ConsumerConnector; > >> > > > > > > > > > > > > > import kafka.message.Message; > >> > > > > > > > > > > > > > import kafka.serializer.DefaultDecoder; > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > public class KafkaTestConsumer { > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > /** > >> > > > > > > > > > > > > > * @param args > >> > > > > > > > > > > > > > */ > >> > > > > > > > > > > > > > public static void main(final String[] > args) > >> { > >> > > > > > > > > > > > > > final Logger logger = > >> > > > > > > > > Logger.getLogger("KafkaTestConsumer"); > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > // specify some consumer properties > >> > > > > > > > > > > > > > final Properties props = new > >> Properties(); > >> > > > > > > > > > > > > > props.put("zk.connect", > >> "testserver:2181"); > >> > > > > > > > > > > > > > props.put("zk.connectiontimeout.ms", > >> > > > "1000000"); > >> > > > > > > > > > > > > > props.put("groupid", "test_group"); > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > // Create the connection to the cluster > >> > > > > > > > > > > > > > final ConsumerConfig consumerConfig = > new > >> > > > > > > > > > > ConsumerConfig(props); > >> > > > > > > > > > > > > > final ConsumerConnector > >> consumerConnector = > >> > > > > > > > > > > > > > > >> > Consumer.createJavaConsumerConnector(consumerConfig); > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > // create 4 partitions of the stream > for > >> > topic > >> > > > > > > > > “testTopic”, > >> > > > > > > > > > to > >> > > > > > > > > > > > > > allow 4 > >> > > > > > > > > > > > > > // threads to consume > >> > > > > > > > > > > > > > final String topicName = "testTopic"; > >> > > > > > > > > > > > > > final int numStreams = 4; > >> > > > > > > > > > > > > > List<KafkaMessageStream<Message>> > >> streams = > >> > > > null; > >> > > > > > > > > > > > > > try { > >> > > > > > > > > > > > > > final Map<String, > >> > > > > > > List<KafkaMessageStream<Message>>> > >> > > > > > > > > > > > > > topicMessageStreams = consumerConnector > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > .createMessageStreams(Collections.singletonMap(topicName, > >> > > > > > > > > > > numStreams), > >> > > > > > > > > > > > > new > >> > > > > > > > > > > > > > DefaultDecoder()); > >> > > > > > > > > > > > > > streams = > >> > > > topicMessageStreams.get(topicName); > >> > > > > > > > > > > > > > } catch (final Exception e) { > >> > > > > > > > > > > > > > logger.severe(e.getMessage()); > >> > > > > > > > > > > > > > } > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > // create list of 4 threads to consume > >> from > >> > > each > >> > > > > of > >> > > > > > > the > >> > > > > > > > > > > > partitions > >> > > > > > > > > > > > > > final ExecutorService executor = > >> > > > > > > > > > > > > > Executors.newFixedThreadPool(numStreams); > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > // consume the messages in the threads > >> > > > > > > > > > > > > > for (final KafkaMessageStream<Message> > >> > stream > >> > > : > >> > > > > > > > streams) { > >> > > > > > > > > > > > > > executor.submit(new Runnable() { > >> > > > > > > > > > > > > > @Override > >> > > > > > > > > > > > > > public void run() { > >> > > > > > > > > > > > > > for (final Message message > : > >> > > > stream) { > >> > > > > > > > > > > > > > logger.severe("!"); > >> > > > > > > > > > > > > > > >> > > > logger.severe(message.toString()); > >> > > > > > > > > > > > > > } > >> > > > > > > > > > > > > > } > >> > > > > > > > > > > > > > }); > >> > > > > > > > > > > > > > } > >> > > > > > > > > > > > > > } > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > } > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > It runs, I get no errors, and nothing > happens. I > >> > > don't > >> > > > > get > >> > > > > > > any > >> > > > > > > > > > > > messages. > >> > > > > > > > > > > > > I > >> > > > > > > > > > > > > > don't THINK it's an issue with my Kafka > install > >> for > >> > > two > >> > > > > > > > reasons: > >> > > > > > > > > 1. > >> > > > > > > > > > > > > > Zookeeper logs my client connection. 2. > (Granted > >> > it's > >> > > > all > >> > > > > > on > >> > > > > > > > > > > localhost > >> > > > > > > > > > > > > but) > >> > > > > > > > > > > > > > When I used the console consumer and producer > on > >> > the > >> > > > > > > instance, > >> > > > > > > > > they > >> > > > > > > > > > > > seem > >> > > > > > > > > > > > > to > >> > > > > > > > > > > > > > work just fine. > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > Methodology is to start Zookeeper, start > Kafka, > >> > start > >> > > > > above > >> > > > > > > > > > > > application, > >> > > > > > > > > > > > > > and then connect a console produce to generate > >> > > > messages. > >> > > > > > I'm > >> > > > > > > > > really > >> > > > > > > > > > > at > >> > > > > > > > > > > > a > >> > > > > > > > > > > > > > loss as to what's happening. Interestingly, > if I > >> > put > >> > > in > >> > > > > > > > > > breakpoints, > >> > > > > > > > > > > I > >> > > > > > > > > > > > > seem > >> > > > > > > > > > > > > > to lose a handle as I eventually lose the > >> ability > >> > to > >> > > > step > >> > > > > > > over, > >> > > > > > > > > > step > >> > > > > > > > > > > > > into, > >> > > > > > > > > > > > > > and so on. > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > I'd really appreciate any input. > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > Cheers. > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > > > >