Hello everyone :) !

I have trouble using the Kafka hadoop consumer included in
contrib/hadoop-consumer and I'd like to know if/how it is used at LinkedIn
or elsewhere? I would also like if someone could confirm or correct the
assumptions I make below.

Here's what I have so far:

It works when pulling from one Kafka broker, but not when pulling from
many. There are two problems:

The first problem concerns the offset files that the Map/Reduce job takes as
its input. From what I understand, these offset files represent the offset
to start reading from on each of the Kafka brokers.

To generate those files the first time (and thus start from offset -1), we
can go in contrib/hadoop-consumer/ and run:

./run-class.sh kafka.etl.impl.DataGenerator my-properties-file.properties

The problem is that this DataGenerator class can take only one Kafka broker
in its parameters (the properties file) and thus generates only one offset
file.

The Map/Reduce job will then spawn one map task for each offset file it
finds in its input directory, and each of these map tasks will connect to a
different Kafka broker. Since the DataGenerator can only generate one offset
file, the Map/Reduce job only spawns one map task which queries only one
Kafka broker.

Unless my assumptions are wrong or someone else provides a nice alternative
solution, I was planning to modify the DataGenerator class so that it can
generate multiple offset files, but for now, as a manual work-around, I just
duplicated the offset files and specified a different Kafka broker in each.

Other than that, I am thinking perhaps a more robust solution would be to
have ZK-based discovery of the available brokers. Again, I'm curious to find
out how this is done at LinkedIn or elsewhere?

The second problem is when I run the M/R job. If I run it with the multiple
offset files I manually generated as its input, it does spawn three map
tasks, as expected, but it then fails with the following error:

java.io.IOException: java.io.EOFException
        at kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:166)
        at kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:30)
        at 
org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:208)
        at 
org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:193)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
        at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:396)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
        at org.apache.hadoop.mapred.Child.main(Child.java:264)Caused by:
java.io.EOFException
        at java.io.DataInputStream.readFully(DataInputStream.java:180)
        at 
org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63)
        at 
org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101)
        at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1945)
        at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2077)
        at 
org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:76)
        at kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:128)
        ... 11 more


It fails before writing anything whatsoever, and it fails repeatedly for
each Map task until the JobTracker reaches the maximum amount of failures
per task and marks the job as failed.

I haven't figured this one out yet...

Any help would be greatly appreciated :) !

Thanks :) !!!!

--
Felix

Reply via email to