For the offset part, there is one offset per partition. So there will be as many as map tasks as total number of partitions, not brokers.
Jun On Tue, Oct 18, 2011 at 9:01 AM, Felix Giguere Villegas < felix.gigu...@mate1inc.com> wrote: > Hello everyone :) ! > > I have trouble using the Kafka hadoop consumer included in > contrib/hadoop-consumer and I'd like to know if/how it is used at LinkedIn > or elsewhere? I would also like if someone could confirm or correct the > assumptions I make below. > > Here's what I have so far: > > It works when pulling from one Kafka broker, but not when pulling from > many. There are two problems: > > The first problem concerns the offset files that the Map/Reduce job takes > as > its input. From what I understand, these offset files represent the offset > to start reading from on each of the Kafka brokers. > > To generate those files the first time (and thus start from offset -1), we > can go in contrib/hadoop-consumer/ and run: > > ./run-class.sh kafka.etl.impl.DataGenerator my-properties-file.properties > > The problem is that this DataGenerator class can take only one Kafka broker > in its parameters (the properties file) and thus generates only one offset > file. > > The Map/Reduce job will then spawn one map task for each offset file it > finds in its input directory, and each of these map tasks will connect to a > different Kafka broker. Since the DataGenerator can only generate one > offset > file, the Map/Reduce job only spawns one map task which queries only one > Kafka broker. > > Unless my assumptions are wrong or someone else provides a nice alternative > solution, I was planning to modify the DataGenerator class so that it can > generate multiple offset files, but for now, as a manual work-around, I > just > duplicated the offset files and specified a different Kafka broker in each. > > Other than that, I am thinking perhaps a more robust solution would be to > have ZK-based discovery of the available brokers. Again, I'm curious to > find > out how this is done at LinkedIn or elsewhere? > > The second problem is when I run the M/R job. If I run it with the multiple > offset files I manually generated as its input, it does spawn three map > tasks, as expected, but it then fails with the following error: > > java.io.IOException: java.io.EOFException > at > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:166) > at kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:30) > at > org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:208) > at > org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:193) > at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48) > at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391) > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325) > at org.apache.hadoop.mapred.Child$4.run(Child.java:270) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:396) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127) > at org.apache.hadoop.mapred.Child.main(Child.java:264)Caused by: > java.io.EOFException > at java.io.DataInputStream.readFully(DataInputStream.java:180) > at > org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63) > at > org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101) > at > org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1945) > at > org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2077) > at > org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:76) > at > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:128) > ... 11 more > > > It fails before writing anything whatsoever, and it fails repeatedly for > each Map task until the JobTracker reaches the maximum amount of failures > per task and marks the job as failed. > > I haven't figured this one out yet... > > Any help would be greatly appreciated :) ! > > Thanks :) !!!! > > -- > Felix >