Ah interesting. So then I guess it would make sense to have as many partitions as I have nodes in my hadoop cluster, or a multiple of the amount of nodes in the cluster, in order to maximize the import speed?
Also, can I have just one offset file? Or would I need to somehow generate one offset per partition? -- Felix On Tue, Oct 18, 2011 at 12:36 PM, Jun Rao <jun...@gmail.com> wrote: > For the offset part, there is one offset per partition. So there will be as > many as map tasks as total number of partitions, not brokers. > > Jun > > On Tue, Oct 18, 2011 at 9:01 AM, Felix Giguere Villegas < > felix.gigu...@mate1inc.com> wrote: > > > Hello everyone :) ! > > > > I have trouble using the Kafka hadoop consumer included in > > contrib/hadoop-consumer and I'd like to know if/how it is used at > LinkedIn > > or elsewhere? I would also like if someone could confirm or correct the > > assumptions I make below. > > > > Here's what I have so far: > > > > It works when pulling from one Kafka broker, but not when pulling from > > many. There are two problems: > > > > The first problem concerns the offset files that the Map/Reduce job takes > > as > > its input. From what I understand, these offset files represent the > offset > > to start reading from on each of the Kafka brokers. > > > > To generate those files the first time (and thus start from offset -1), > we > > can go in contrib/hadoop-consumer/ and run: > > > > ./run-class.sh kafka.etl.impl.DataGenerator my-properties-file.properties > > > > The problem is that this DataGenerator class can take only one Kafka > broker > > in its parameters (the properties file) and thus generates only one > offset > > file. > > > > The Map/Reduce job will then spawn one map task for each offset file it > > finds in its input directory, and each of these map tasks will connect to > a > > different Kafka broker. Since the DataGenerator can only generate one > > offset > > file, the Map/Reduce job only spawns one map task which queries only one > > Kafka broker. > > > > Unless my assumptions are wrong or someone else provides a nice > alternative > > solution, I was planning to modify the DataGenerator class so that it can > > generate multiple offset files, but for now, as a manual work-around, I > > just > > duplicated the offset files and specified a different Kafka broker in > each. > > > > Other than that, I am thinking perhaps a more robust solution would be to > > have ZK-based discovery of the available brokers. Again, I'm curious to > > find > > out how this is done at LinkedIn or elsewhere? > > > > The second problem is when I run the M/R job. If I run it with the > multiple > > offset files I manually generated as its input, it does spawn three map > > tasks, as expected, but it then fails with the following error: > > > > java.io.IOException: java.io.EOFException > > at > > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:166) > > at > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:30) > > at > > > org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:208) > > at > > > org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:193) > > at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48) > > at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391) > > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325) > > at org.apache.hadoop.mapred.Child$4.run(Child.java:270) > > at java.security.AccessController.doPrivileged(Native Method) > > at javax.security.auth.Subject.doAs(Subject.java:396) > > at > > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127) > > at org.apache.hadoop.mapred.Child.main(Child.java:264)Caused by: > > java.io.EOFException > > at java.io.DataInputStream.readFully(DataInputStream.java:180) > > at > > > org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63) > > at > > org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101) > > at > > org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1945) > > at > > org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2077) > > at > > > org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:76) > > at > > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:128) > > ... 11 more > > > > > > It fails before writing anything whatsoever, and it fails repeatedly for > > each Map task until the JobTracker reaches the maximum amount of failures > > per task and marks the job as failed. > > > > I haven't figured this one out yet... > > > > Any help would be greatly appreciated :) ! > > > > Thanks :) !!!! > > > > -- > > Felix > > >