I was using the kafka-v0.6 branch.. Seems like this Null pointer issue has been fixed on the master.
However, I see that , _response.iterator() method is being invoked multiple times on the same response object. This was what was causing the loops. Id have the iterator object as the class attribute instead of the _response object. Sam On Aug 11, 2011, at 2:07 PM, Sam William wrote: > Neha, > I followed the procedure described in > > https://github.com/kafka-dev/kafka/tree/kafka-v0.6/contrib/hadoop-consumer . > I could nt get the distributed cache working . So, for the time being I went > ahead with copying the jars to the tasknodes to get it to run. > > > Im not sure , if this particular portion of the examples is completely tested > or if I have a wrong version of the file .. > > For example . > > The file KafkaETLContext.java > > > public boolean getNext(KafkaETLKey key, BytesWritable value) throws > IOException { > if ( !hasMore() ) return false; > > boolean gotNext = get(key, value); > > Iterator<ByteBufferMessageSet> iter = _response.iterator(); > > while ( !gotNext && _response != null && iter.hasNext()) { > ByteBufferMessageSet msgSet = iter.next(); > if ( hasError(msgSet)) return false; > _messageIt = (Iterator<Message>) msgSet.iterator(); > gotNext = get(key, value); > } > > return gotNext; > } > > > > > > The null check for _response is comes after Iterator<ByteBufferMessageSet> > iter = _response.iterator() and the first time this function is called , the > _response object is null. > > Another question , > If i just generate say 2000 events . and set kafka.fetch.limit = 3000 or > (-1) , Id expect the the job to stop after reading the 2000 events, but it > seems to keep looping from the beginning. Am I missing something here ? > > Sam > > > > > > > > > > > On Aug 9, 2011, at 4:29 PM, Neha Narkhede wrote: > >> Sam, >> >> I tried this on a downloaded copy of kafka >> v0.6<http://sna-projects.com/kafka/downloads/kafka-0.6.zip>- >> >> nnarkhed-md:kafka nnarkhed$ jar tvf kafka-0.6.jar | grep >> "kafka.javaapi.consumer.SimpleConsumer" >> 3501 Tue May 24 10:23:24 PDT 2011 >> kafka/javaapi/consumer/SimpleConsumer.class >> >> I suspect that the kafka-0.6.jar is not registered correctly with the >> DistributedCache. >> >> Thanks, >> Neha >> >> On Tue, Aug 9, 2011 at 3:32 PM, Sam William <sa...@stumbleupon.com> wrote: >> >>> Im trying to run this sample hadoop consumer in the 0.6 version . I see >>> that the jar files (including kafka-0.6.jar) are proper when being copied to >>> the DistributedCache , but I get the exception >>> >>> Error: java.lang.ClassNotFoundException: >>> kafka.javaapi.consumer.SimpleConsumer >>> at java.net.URLClassLoader$1.run(URLClassLoader.java:202) >>> at java.security.AccessController.doPrivileged(Native Method) >>> at java.net.URLClassLoader.findClass(URLClassLoader.java:190) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:307) >>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:248) >>> at kafka.etl.KafkaETLContext.(KafkaETLContext.java:93) >>> at >>> kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:115) >>> at kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:14) >>> at >>> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:192) >>> at >>> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:176) >>> at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48) >>> at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358) >>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307) >>> at org.apache.hadoop.mapred.Child.main(Child.java:170) >>> >>> >>> >>> Is there something Im missing here ? >>> >>> >>> Sam William >>> sa...@stumbleupon.com >>> >>> >>> >>> > > Sam William > sa...@stumbleupon.com > > > Sam William sa...@stumbleupon.com