Ah interesting.

So then I guess it would make sense to have as many partitions as I have
nodes in my hadoop cluster, or a multiple of the amount of nodes in the
cluster, in order to maximize the import speed?

Also, can I have just one offset file? Or would I need to somehow generate
one offset per partition?

--
Felix



On Tue, Oct 18, 2011 at 12:36 PM, Jun Rao <jun...@gmail.com> wrote:

> For the offset part, there is one offset per partition. So there will be as
> many as map tasks as total number of partitions, not brokers.
>
> Jun
>
> On Tue, Oct 18, 2011 at 9:01 AM, Felix Giguere Villegas <
> felix.gigu...@mate1inc.com> wrote:
>
> > Hello everyone :) !
> >
> > I have trouble using the Kafka hadoop consumer included in
> > contrib/hadoop-consumer and I'd like to know if/how it is used at
> LinkedIn
> > or elsewhere? I would also like if someone could confirm or correct the
> > assumptions I make below.
> >
> > Here's what I have so far:
> >
> > It works when pulling from one Kafka broker, but not when pulling from
> > many. There are two problems:
> >
> > The first problem concerns the offset files that the Map/Reduce job takes
> > as
> > its input. From what I understand, these offset files represent the
> offset
> > to start reading from on each of the Kafka brokers.
> >
> > To generate those files the first time (and thus start from offset -1),
> we
> > can go in contrib/hadoop-consumer/ and run:
> >
> > ./run-class.sh kafka.etl.impl.DataGenerator my-properties-file.properties
> >
> > The problem is that this DataGenerator class can take only one Kafka
> broker
> > in its parameters (the properties file) and thus generates only one
> offset
> > file.
> >
> > The Map/Reduce job will then spawn one map task for each offset file it
> > finds in its input directory, and each of these map tasks will connect to
> a
> > different Kafka broker. Since the DataGenerator can only generate one
> > offset
> > file, the Map/Reduce job only spawns one map task which queries only one
> > Kafka broker.
> >
> > Unless my assumptions are wrong or someone else provides a nice
> alternative
> > solution, I was planning to modify the DataGenerator class so that it can
> > generate multiple offset files, but for now, as a manual work-around, I
> > just
> > duplicated the offset files and specified a different Kafka broker in
> each.
> >
> > Other than that, I am thinking perhaps a more robust solution would be to
> > have ZK-based discovery of the available brokers. Again, I'm curious to
> > find
> > out how this is done at LinkedIn or elsewhere?
> >
> > The second problem is when I run the M/R job. If I run it with the
> multiple
> > offset files I manually generated as its input, it does spawn three map
> > tasks, as expected, but it then fails with the following error:
> >
> > java.io.IOException: java.io.EOFException
> >        at
> > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:166)
> >        at
> kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:30)
> >        at
> >
> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:208)
> >        at
> >
> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:193)
> >        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48)
> >        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391)
> >        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
> >        at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
> >        at java.security.AccessController.doPrivileged(Native Method)
> >        at javax.security.auth.Subject.doAs(Subject.java:396)
> >        at
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
> >        at org.apache.hadoop.mapred.Child.main(Child.java:264)Caused by:
> > java.io.EOFException
> >        at java.io.DataInputStream.readFully(DataInputStream.java:180)
> >        at
> >
> org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63)
> >        at
> > org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101)
> >        at
> > org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1945)
> >        at
> > org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2077)
> >        at
> >
> org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:76)
> >        at
> > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:128)
> >        ... 11 more
> >
> >
> > It fails before writing anything whatsoever, and it fails repeatedly for
> > each Map task until the JobTracker reaches the maximum amount of failures
> > per task and marks the job as failed.
> >
> > I haven't figured this one out yet...
> >
> > Any help would be greatly appreciated :) !
> >
> > Thanks :) !!!!
> >
> > --
> > Felix
> >
>

Reply via email to