Neha, I followed the procedure described in https://github.com/kafka-dev/kafka/tree/kafka-v0.6/contrib/hadoop-consumer . I could nt get the distributed cache working . So, for the time being I went ahead with copying the jars to the tasknodes to get it to run.
Im not sure , if this particular portion of the examples is completely tested or if I have a wrong version of the file .. For example . The file KafkaETLContext.java public boolean getNext(KafkaETLKey key, BytesWritable value) throws IOException { if ( !hasMore() ) return false; boolean gotNext = get(key, value); Iterator<ByteBufferMessageSet> iter = _response.iterator(); while ( !gotNext && _response != null && iter.hasNext()) { ByteBufferMessageSet msgSet = iter.next(); if ( hasError(msgSet)) return false; _messageIt = (Iterator<Message>) msgSet.iterator(); gotNext = get(key, value); } return gotNext; } The null check for _response is comes after Iterator<ByteBufferMessageSet> iter = _response.iterator() and the first time this function is called , the _response object is null. Another question , If i just generate say 2000 events . and set kafka.fetch.limit = 3000 or (-1) , Id expect the the job to stop after reading the 2000 events, but it seems to keep looping from the beginning. Am I missing something here ? Sam On Aug 9, 2011, at 4:29 PM, Neha Narkhede wrote: > Sam, > > I tried this on a downloaded copy of kafka > v0.6<http://sna-projects.com/kafka/downloads/kafka-0.6.zip>- > > nnarkhed-md:kafka nnarkhed$ jar tvf kafka-0.6.jar | grep > "kafka.javaapi.consumer.SimpleConsumer" > 3501 Tue May 24 10:23:24 PDT 2011 > kafka/javaapi/consumer/SimpleConsumer.class > > I suspect that the kafka-0.6.jar is not registered correctly with the > DistributedCache. > > Thanks, > Neha > > On Tue, Aug 9, 2011 at 3:32 PM, Sam William <sa...@stumbleupon.com> wrote: > >> Im trying to run this sample hadoop consumer in the 0.6 version . I see >> that the jar files (including kafka-0.6.jar) are proper when being copied to >> the DistributedCache , but I get the exception >> >> Error: java.lang.ClassNotFoundException: >> kafka.javaapi.consumer.SimpleConsumer >> at java.net.URLClassLoader$1.run(URLClassLoader.java:202) >> at java.security.AccessController.doPrivileged(Native Method) >> at java.net.URLClassLoader.findClass(URLClassLoader.java:190) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:307) >> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:248) >> at kafka.etl.KafkaETLContext.(KafkaETLContext.java:93) >> at >> kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:115) >> at kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:14) >> at >> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:192) >> at >> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:176) >> at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48) >> at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358) >> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307) >> at org.apache.hadoop.mapred.Child.main(Child.java:170) >> >> >> >> Is there something Im missing here ? >> >> >> Sam William >> sa...@stumbleupon.com >> >> >> >> Sam William sa...@stumbleupon.com