I would actually love for us to release the full ETL system we have for Kafka/Hadoop, it is just a matter of finding the time to get this code into that shape.
The hadoop team that maintains that code is pretty busy right now, but i am hoping we can find a way. -Jay On Tue, Oct 18, 2011 at 3:18 PM, Felix Giguere Villegas < felix.gigu...@mate1inc.com> wrote: > Thanks for your replies guys :) > > @Jay: I thought about the Hadoop version mismatch too, because I've had the > same problem before. I'll double check again to make sure I have the same > versions of hadoop everywhere, as the Kafka distributed cluster I was > testing on is a new setup and I might have forgotten to put the hadoop jars > we use in it... I'm working part-time for now so I probably won't touch > this > again until next week but I'll keep you guys posted ASAP :) > > @Richard: Thanks a lot for your description. That clears out the > inaccuracies in my understanding. Is there any chance you guys might > release > the code you use to query ZK and create appropriate offset files for each > broker/partition pair? The hadoop consumer provided in the source works > with > the setup we get from the quickstart guide, but the process you describe > seems more appropriate for production use. > > Thanks again :) > > -- > Felix > > > > On Tue, Oct 18, 2011 at 5:52 PM, Richard Park <richard.b.p...@gmail.com > >wrote: > > > Does the version in contrib contain the fixes for Kafka-131? The offsets > > were incorrectly computed prior to this patch. > > > > At LinkedIn, this is what we do in a nutshell. > > 1. We connect to the zookeeper instance. With this we are able to > discover > > the topics, the brokers and the partitions of a broker. > > > > 2. For a topic we want to pull, we create files that contains the offset > > for > > each broker and partition. Each individual file contains a unique > > broker/partition pair. This is essentially what data generator does, > except > > we use values from zookeeper. We take the output of the previous run of > > kafka (the new offsets) and use them as the new offset files. If the old > > offset doesn't exist, we set a default starting offset. > > > > 3. We run the pull hadoop job. One mapper per broker/partition pulls > using > > the simple consumer into hdfs (the KafkaETLRecordReader handles most of > > this). We query kafka for the latest offset. The mapper fetches from the > > kafka broker until the latest offset is reached. > > > > 4. We group the data by hourly partition with a reduce step. > > > > 5. The kafka hadoop job's mapper spits out new offsets for the next time > we > > decide to pull the data. The pull occurs at regular scheduled intervals > > quite frequently. > > > > That's the gist of it. There are a few additional modification we made to > > the kafka job including the ability to handle unavailable nodes, avro > > schema > > resolution and auditing. > > > > Thanks, > > -Richard > > > > > > > > On Tue, Oct 18, 2011 at 2:03 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > > > Is it possible that this is due to a hadoop version mismatch? Typically > > if > > > the client jar you pick up does not match the hadoop version of your > > hadoop > > > cluster you get EOFException. > > > > > > -Jay > > > > > > On Tue, Oct 18, 2011 at 9:01 AM, Felix Giguere Villegas < > > > felix.gigu...@mate1inc.com> wrote: > > > > > > > Hello everyone :) ! > > > > > > > > I have trouble using the Kafka hadoop consumer included in > > > > contrib/hadoop-consumer and I'd like to know if/how it is used at > > > LinkedIn > > > > or elsewhere? I would also like if someone could confirm or correct > the > > > > assumptions I make below. > > > > > > > > Here's what I have so far: > > > > > > > > It works when pulling from one Kafka broker, but not when pulling > from > > > > many. There are two problems: > > > > > > > > The first problem concerns the offset files that the Map/Reduce job > > takes > > > > as > > > > its input. From what I understand, these offset files represent the > > > offset > > > > to start reading from on each of the Kafka brokers. > > > > > > > > To generate those files the first time (and thus start from offset > -1), > > > we > > > > can go in contrib/hadoop-consumer/ and run: > > > > > > > > ./run-class.sh kafka.etl.impl.DataGenerator > > my-properties-file.properties > > > > > > > > The problem is that this DataGenerator class can take only one Kafka > > > broker > > > > in its parameters (the properties file) and thus generates only one > > > offset > > > > file. > > > > > > > > The Map/Reduce job will then spawn one map task for each offset file > it > > > > finds in its input directory, and each of these map tasks will > connect > > to > > > a > > > > different Kafka broker. Since the DataGenerator can only generate one > > > > offset > > > > file, the Map/Reduce job only spawns one map task which queries only > > one > > > > Kafka broker. > > > > > > > > Unless my assumptions are wrong or someone else provides a nice > > > alternative > > > > solution, I was planning to modify the DataGenerator class so that it > > can > > > > generate multiple offset files, but for now, as a manual work-around, > I > > > > just > > > > duplicated the offset files and specified a different Kafka broker in > > > each. > > > > > > > > Other than that, I am thinking perhaps a more robust solution would > be > > to > > > > have ZK-based discovery of the available brokers. Again, I'm curious > to > > > > find > > > > out how this is done at LinkedIn or elsewhere? > > > > > > > > The second problem is when I run the M/R job. If I run it with the > > > multiple > > > > offset files I manually generated as its input, it does spawn three > map > > > > tasks, as expected, but it then fails with the following error: > > > > > > > > java.io.IOException: java.io.EOFException > > > > at > > > > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:166) > > > > at > > > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:30) > > > > at > > > > > > > > > > org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:208) > > > > at > > > > > > > > > > org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:193) > > > > at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48) > > > > at > > org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391) > > > > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325) > > > > at org.apache.hadoop.mapred.Child$4.run(Child.java:270) > > > > at java.security.AccessController.doPrivileged(Native Method) > > > > at javax.security.auth.Subject.doAs(Subject.java:396) > > > > at > > > > > > > > > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127) > > > > at org.apache.hadoop.mapred.Child.main(Child.java:264)Caused > by: > > > > java.io.EOFException > > > > at java.io.DataInputStream.readFully(DataInputStream.java:180) > > > > at > > > > > > > > > > org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63) > > > > at > > > > > org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101) > > > > at > > > > org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1945) > > > > at > > > > org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2077) > > > > at > > > > > > > > > > org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:76) > > > > at > > > > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:128) > > > > ... 11 more > > > > > > > > > > > > It fails before writing anything whatsoever, and it fails repeatedly > > for > > > > each Map task until the JobTracker reaches the maximum amount of > > failures > > > > per task and marks the job as failed. > > > > > > > > I haven't figured this one out yet... > > > > > > > > Any help would be greatly appreciated :) ! > > > > > > > > Thanks :) !!!! > > > > > > > > -- > > > > Felix > > > > > > > > > >