Richard, I read somewhere that the mappers write out the offset to the output dir , so that further attempts (after a task failure) can start from the right offset. I see that the offset is generated . But where is the logic to read this and adjust the offset for the next read ? . I wasnt able to find it.
Sam On Aug 31, 2011, at 12:48 PM, Richard Park wrote: > It really looks like your mapper tasks may be failing to connect to your > kafka server. > > Here's a brief overview of what that demo job is doing so you can understand > where the example may have gone wrong. > DataGenerator: > > 1. When DataGenerator is run, it needs the property 'kafka.etl.topic', > and 'kafka.server.uri' set in the properties file. When you run > ./run-class.sh > kafka.etl.impl.DataGenerator test/test.properties, you can tell that > they're properly set by the output 'topic=<blah>' and 'server uri=<kafka > server url>. > 2. The DataGenerator will create a bunch of dummy messages and pump it to > that kafka server. Afterwards, it will write a file to HDFS at path 'input' > which you also set in the properties file. The file that is created will be > named something like 1.dat. > 3. 1.dat is a sequence file, so if it isn't compressed, you should be > able to see its contents in plain text. The contents will essentially list > the kafka server url, the partition number and the topic as well as the > offset. > 4. In a real scenario, you'll probably create several of these files for > each broker and possibly partition, but for this example, you only need one > file. Each file will spawn a mapper during the mapred step. > > CopyJars: > > 1. This should copy the necessary jars for kafka hadoop, and push them > into HDFS for the distributed cache. If the jars are copied locally instead > of to a remote cluster, most likely HADOOP_CONF_DIR hasn't been set up > correctly. The environment should probably be set by the script, so someone > can change that. > > SimpleKafkaETLJob > > 1. This job will then setup the distributed classpath, and the input path > should be the directory that 1.dat was written to. > 2. Internally, the mappers will then load 1.dat and use the connection > data contained in it to connect to kafka. If it's trying to connect to > anything but your kafka server, than this file was incorrectly written. > 3. The RecordReader wraps all of this and hides all the connection stuff > so that your Mapper should see a stream of Kafka messages rather than the > contents of 1.dat. > > So please see if you can figure out what is wrong with your example and feel > free beef up the README instructions to take in account your pitfalls. > > Thanks, > -Richard > > > > On Wed, Aug 31, 2011 at 12:02 PM, Ben Ciceron <b...@triggit.com> wrote: > >> ok i could live with setting mapred.job.tracker manually for the code for >> now. >> This way it can connect now to the proper jobtracker. >> >>> The hadoop map tasks will need to connect to the kafka server port (the >>> broker uri/port). >> >> i run the hadoop soncumer on the same hostA where the kafka-server is >> running. >> >> each of the host in the hadoop cluster can telnet/nmap to port 9092 on >> hostA where the kafka-server is running. >> also HostA can connect to port 5181 on any host in the cluster. >> >> but each task fails with a similar connection issue : >> java.io.IOException: java.net.ConnectException: Connection refused >> at >> kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:155) >> at kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:14) >> at >> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:210) >> at >> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:195) >> at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48) >> at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:393) >> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:326) >> at org.apache.hadoop.mapred.Child$4.run(Child.java:268) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:396) >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1074) >> at org.apache.hadoop.mapred.Child.main(Child.java:262) >> Caused by: java.net.ConnectException: Connection refused >> at sun.nio.ch.Net.connect(Native Method) >> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500) >> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:54) >> at >> kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:193) >> at >> kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:156) >> at >> kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:65) >> at >> kafka.etl.KafkaETLContext.getOffsetRange(KafkaETLContext.java:209) >> at kafka.etl.KafkaETLContext.<init>(KafkaETLContext.java:97) >> at >> kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:115) >> ... 11 more >> Sam William sa...@stumbleupon.com