It really looks like your mapper tasks may be failing to connect to your kafka server.
Here's a brief overview of what that demo job is doing so you can understand where the example may have gone wrong. DataGenerator: 1. When DataGenerator is run, it needs the property 'kafka.etl.topic', and 'kafka.server.uri' set in the properties file. When you run ./run-class.sh kafka.etl.impl.DataGenerator test/test.properties, you can tell that they're properly set by the output 'topic=<blah>' and 'server uri=<kafka server url>. 2. The DataGenerator will create a bunch of dummy messages and pump it to that kafka server. Afterwards, it will write a file to HDFS at path 'input' which you also set in the properties file. The file that is created will be named something like 1.dat. 3. 1.dat is a sequence file, so if it isn't compressed, you should be able to see its contents in plain text. The contents will essentially list the kafka server url, the partition number and the topic as well as the offset. 4. In a real scenario, you'll probably create several of these files for each broker and possibly partition, but for this example, you only need one file. Each file will spawn a mapper during the mapred step. CopyJars: 1. This should copy the necessary jars for kafka hadoop, and push them into HDFS for the distributed cache. If the jars are copied locally instead of to a remote cluster, most likely HADOOP_CONF_DIR hasn't been set up correctly. The environment should probably be set by the script, so someone can change that. SimpleKafkaETLJob 1. This job will then setup the distributed classpath, and the input path should be the directory that 1.dat was written to. 2. Internally, the mappers will then load 1.dat and use the connection data contained in it to connect to kafka. If it's trying to connect to anything but your kafka server, than this file was incorrectly written. 3. The RecordReader wraps all of this and hides all the connection stuff so that your Mapper should see a stream of Kafka messages rather than the contents of 1.dat. So please see if you can figure out what is wrong with your example and feel free beef up the README instructions to take in account your pitfalls. Thanks, -Richard On Wed, Aug 31, 2011 at 12:02 PM, Ben Ciceron <b...@triggit.com> wrote: > ok i could live with setting mapred.job.tracker manually for the code for > now. > This way it can connect now to the proper jobtracker. > > > The hadoop map tasks will need to connect to the kafka server port (the > > broker uri/port). > > i run the hadoop soncumer on the same hostA where the kafka-server is > running. > > each of the host in the hadoop cluster can telnet/nmap to port 9092 on > hostA where the kafka-server is running. > also HostA can connect to port 5181 on any host in the cluster. > > but each task fails with a similar connection issue : > java.io.IOException: java.net.ConnectException: Connection refused > at > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:155) > at kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:14) > at > org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:210) > at > org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:195) > at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48) > at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:393) > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:326) > at org.apache.hadoop.mapred.Child$4.run(Child.java:268) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:396) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1074) > at org.apache.hadoop.mapred.Child.main(Child.java:262) > Caused by: java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect(Native Method) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500) > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:54) > at > kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:193) > at > kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:156) > at > kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:65) > at > kafka.etl.KafkaETLContext.getOffsetRange(KafkaETLContext.java:209) > at kafka.etl.KafkaETLContext.<init>(KafkaETLContext.java:97) > at > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:115) > ... 11 more >