Hello everyone :) ! I have trouble using the Kafka hadoop consumer included in contrib/hadoop-consumer and I'd like to know if/how it is used at LinkedIn or elsewhere? I would also like if someone could confirm or correct the assumptions I make below.
Here's what I have so far: It works when pulling from one Kafka broker, but not when pulling from many. There are two problems: The first problem concerns the offset files that the Map/Reduce job takes as its input. From what I understand, these offset files represent the offset to start reading from on each of the Kafka brokers. To generate those files the first time (and thus start from offset -1), we can go in contrib/hadoop-consumer/ and run: ./run-class.sh kafka.etl.impl.DataGenerator my-properties-file.properties The problem is that this DataGenerator class can take only one Kafka broker in its parameters (the properties file) and thus generates only one offset file. The Map/Reduce job will then spawn one map task for each offset file it finds in its input directory, and each of these map tasks will connect to a different Kafka broker. Since the DataGenerator can only generate one offset file, the Map/Reduce job only spawns one map task which queries only one Kafka broker. Unless my assumptions are wrong or someone else provides a nice alternative solution, I was planning to modify the DataGenerator class so that it can generate multiple offset files, but for now, as a manual work-around, I just duplicated the offset files and specified a different Kafka broker in each. Other than that, I am thinking perhaps a more robust solution would be to have ZK-based discovery of the available brokers. Again, I'm curious to find out how this is done at LinkedIn or elsewhere? The second problem is when I run the M/R job. If I run it with the multiple offset files I manually generated as its input, it does spawn three map tasks, as expected, but it then fails with the following error: java.io.IOException: java.io.EOFException at kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:166) at kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:30) at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:208) at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:193) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325) at org.apache.hadoop.mapred.Child$4.run(Child.java:270) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127) at org.apache.hadoop.mapred.Child.main(Child.java:264)Caused by: java.io.EOFException at java.io.DataInputStream.readFully(DataInputStream.java:180) at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63) at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101) at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1945) at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2077) at org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:76) at kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:128) ... 11 more It fails before writing anything whatsoever, and it fails repeatedly for each Map task until the JobTracker reaches the maximum amount of failures per task and marks the job as failed. I haven't figured this one out yet... Any help would be greatly appreciated :) ! Thanks :) !!!! -- Felix