Hello Ranjini, PFA the source code for XML Input Format. Also find the output and the input which i have used.
ATTACHED FILES DESRIPTION: (1) emp.xml --->Input Data for testing (2)emp_op.tar.zg-->Output. Results of the map only job ( I have set the number of reducer=0) (3)src.tar--> the source files (Please create a project in eclipse and paste the files ) The code is written with appropriate package and source folder.. RUNNING THE JOB: hadoop jar xml.jar com.xg.hadoop.training.mr.MyDriver -D START_TAG=\<Employee\> -D END_TAG=\</Employee\> emp op Explaination of the above command: (1) xml.jar is the jar name which we create either through eclipse or maven or ant (2) com.xg.hadoop.training.mr.MyDriver is the fully qualified driver class name. It means that MyDriver is residing under package com.xg.hadoop.training.mr (3) -D START_TAG=<Employee> will not work because it will treat the Employee as input directory which is not the case..Therefore you need to escape them and thats why it is written as -D START_TAG=\<Employee\>, you can very well see that the two angular brackets are escaped. The similar explanation goes for -D END_TAG (4) emp is the input data which is present on HDFS (5) op is the output directory which will be created as part of mapreduce job. NOTE: The number of reducers is explicitly set to ZERO. So this will map reduce job will always run ZERO reducer tasks. You need to change the driver code. Hope this would help and you will be able to solve your problem. In case if you face any difficulty please feel free to contact Regards, K Som Shekhar Sharma +91-8197243810 Regards, Som Shekhar Sharma +91-8197243810 On Tue, Dec 17, 2013 at 5:42 PM, Ranjini Rathinam <[email protected]> wrote: > Hi, > > I have attached the code. Please verify. > > Please suggest . I am using hadoop 0.20 version. > > > import java.io.IOException; > import java.util.logging.Level; > import java.util.logging.Logger; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.FileSystem; > import org.apache.hadoop.fs.Path; > import org.apache.hadoop.io.NullWritable; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapreduce.Job; > import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; > import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; > import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; > //import org.apache.hadoop.mapreduce.lib.input.XmlInputFormat; > > public class ParserDriverMain { > > public static void main(String[] args) { > try { > runJob(args[0], args[1]); > > } catch (IOException ex) { > Logger.getLogger(ParserDriverMain.class.getName()).log(Level.SEVERE, null, > ex); > } > > } > > //The code is mostly self explanatory. You need to define the starting and > ending tag of to split a record from the xml file and it can be defined in > the following lines > > //conf.set("xmlinput.start", "<startingTag>"); > //conf.set("xmlinput.end", "</endingTag>"); > > > public static void runJob(String input,String output ) throws IOException { > > Configuration conf = new Configuration(); > > conf.set("xmlinput.start", "<Employee>"); > conf.set("xmlinput.end", "</Employee>"); > conf.set("io.serializations","org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization"); > > Job job = new Job(conf, "jobName"); > > input="/user/hduser/Ran/"; > output="/user/task/Sales/"; > FileInputFormat.setInputPaths(job, input); > job.setJarByClass(ParserDriverMain.class); > job.setMapperClass(MyParserMapper.class); > job.setNumReduceTasks(1); > job.setInputFormatClass(XmlInputFormatNew.class); > job.setOutputKeyClass(NullWritable.class); > job.setOutputValueClass(Text.class); > Path outPath = new Path(output); > FileOutputFormat.setOutputPath(job, outPath); > FileSystem dfs = FileSystem.get(outPath.toUri(), conf); > if (dfs.exists(outPath)) { > dfs.delete(outPath, true); > } > > > try { > > job.waitForCompletion(true); > > } catch (InterruptedException ex) { > Logger.getLogger(ParserDriverMain.class.getName()).log(Level.SEVERE, null, > ex); > } catch (ClassNotFoundException ex) { > Logger.getLogger(ParserDriverMain.class.getName()).log(Level.SEVERE, null, > ex); > } > > } > > } > > > > > > import java.io.IOException; > import java.util.logging.Level; > import java.util.logging.Logger; > import org.apache.hadoop.io.LongWritable; > import org.apache.hadoop.io.NullWritable; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapreduce.Mapper; > import org.jdom.Document; > import org.jdom.Element; > import org.jdom.JDOMException; > import org.jdom.input.SAXBuilder; > import java.io.Reader; > import java.io.StringReader; > > /** > * > * @author root > */ > public class MyParserMapper extends Mapper<LongWritable, Text, NullWritable, > Text> { > > @Override > public void map(LongWritable key, Text value1,Context context)throws > IOException, InterruptedException { > > String xmlString = value1.toString(); > System.out.println("xmlString===="+xmlString); > SAXBuilder builder = new SAXBuilder(); > Reader in = new StringReader(xmlString); > String value=""; > try { > > Document doc = builder.build(in); > Element root = doc.getRootElement(); > > //String tag1 > =root.getChild("tag").getChild("tag1").getTextTrim() ; > > // String tag2 > =root.getChild("tag").getChild("tag1").getChild("tag2").getTextTrim(); > value= > root.getChild("id").getChild("ename").getChild("dept").getChild("sal").getChild("location").getTextTrim(); > context.write(NullWritable.get(), new > Text(value)); > } catch (JDOMException ex) { > > Logger.getLogger(MyParserMapper.class.getName()).log(Level.SEVERE, null, > ex); > } catch (IOException ex) { > > Logger.getLogger(MyParserMapper.class.getName()).log(Level.SEVERE, null, > ex); > } > > } > > } > > > > > > > import java.io.IOException; > import org.apache.hadoop.fs.FSDataInputStream; > import org.apache.hadoop.fs.FileSystem; > import org.apache.hadoop.fs.Path; > import org.apache.hadoop.io.DataOutputBuffer; > import org.apache.hadoop.io.LongWritable; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapreduce.Job; > import org.apache.hadoop.mapreduce.Mapper; > import org.apache.hadoop.mapreduce.Reducer; > import org.apache.hadoop.mapreduce.InputSplit; > import org.apache.hadoop.mapreduce.RecordReader; > import org.apache.hadoop.mapreduce.TaskAttemptContext; > import org.apache.hadoop.mapreduce.TaskAttemptID; > import org.apache.hadoop.mapreduce.lib.input.FileSplit; > import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; > import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; > import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; > import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; > /** > * Reads records that are delimited by a specifc begin/end tag. > */ > public class XmlInputFormatNew extends TextInputFormat { > > public static final String START_TAG_KEY = "<Employee>"; > public static final String END_TAG_KEY = "</Employee>"; > > @Override > public RecordReader<LongWritable,Text> createRecordReader(InputSplit is, > TaskAttemptContext tac) { > > return new XmlRecordReader(); > > } > public static class XmlRecordReader extends > RecordReader<LongWritable,Text> { > private byte[] startTag; > private byte[] endTag; > private long start; > private long end; > private FSDataInputStream fsin; > private DataOutputBuffer buffer = new DataOutputBuffer(); > private LongWritable key = new LongWritable(); > private Text value = new Text(); > > @Override > public void initialize(InputSplit is, TaskAttemptContext tac) throws > IOException, InterruptedException { > FileSplit fileSplit= (FileSplit) is; > startTag = > tac.getConfiguration().get(START_TAG_KEY).getBytes("utf-8"); > endTag = > tac.getConfiguration().get(END_TAG_KEY).getBytes("utf-8"); > > > start = fileSplit.getStart(); > end = start + fileSplit.getLength(); > Path file = fileSplit.getPath(); > > FileSystem fs = file.getFileSystem(tac.getConfiguration()); > fsin = fs.open(fileSplit.getPath()); > fsin.seek(start); > > > } > > @Override > public boolean nextKeyValue() throws IOException, > InterruptedException { > if (fsin.getPos() < end) { > if (readUntilMatch(startTag, false)) { > try { > buffer.write(startTag); > if (readUntilMatch(endTag, true)) { > > value.set(buffer.getData(), 0, buffer.getLength()); > key.set(fsin.getPos()); > return true; > } > } finally { > buffer.reset(); > } > } > } > return false; > } > > @Override > public LongWritable getCurrentKey() throws IOException, > InterruptedException { > return key; > } > > @Override > public Text getCurrentValue() throws IOException, > InterruptedException { > return value; > > > > } > > @Override > public float getProgress() throws IOException, InterruptedException > { > return (fsin.getPos() - start) / (float) (end - start); > } > > @Override > public void close() throws IOException { > fsin.close(); > } > private boolean readUntilMatch(byte[] match, boolean withinBlock) > throws IOException { > int i = 0; > while (true) { > int b = fsin.read(); > // end of file: > if (b == -1) return false; > // save to buffer: > if (withinBlock) buffer.write(b); > > // check if we're matching: > if (b == match[i]) { > i++; > if (i >= match.length) return true; > } else i = 0; > // see if we've passed the stop point: > if (!withinBlock && i == 0 && fsin.getPos() >= end) return false; > } > } > > } > > > } > then also following error has occured , please help. > > hduser@localhost:~$ hadoop jar xml.jar ParserDriverMain Ran Sales > 13/12/17 15:02:01 WARN mapred.JobClient: Use GenericOptionsParser for > parsing the arguments. Applications should implement Tool for the same. > 13/12/17 15:02:01 INFO input.FileInputFormat: Total input paths to process : > 1 > 13/12/17 15:02:01 INFO mapred.JobClient: Running job: job_201312161706_0021 > 13/12/17 15:02:02 INFO mapred.JobClient: map 0% reduce 0% > 13/12/17 15:02:12 INFO mapred.JobClient: Task Id : > attempt_201312161706_0021_m_000000_0, Status : FAILED > Error: Found class org.apache.hadoop.mapreduce.TaskAttemptContext, but > interface was expected > 13/12/17 15:02:18 INFO mapred.JobClient: Task Id : > attempt_201312161706_0021_m_000000_1, Status : FAILED > Error: Found class org.apache.hadoop.mapreduce.TaskAttemptContext, but > interface was expected > 13/12/17 15:02:24 INFO mapred.JobClient: Task Id : > attempt_201312161706_0021_m_000000_2, Status : FAILED > Error: Found class org.apache.hadoop.mapreduce.TaskAttemptContext, but > interface was expected > 13/12/17 15:02:33 INFO mapred.JobClient: Job complete: job_201312161706_0021 > 13/12/17 15:02:33 INFO mapred.JobClient: Counters: 3 > 13/12/17 15:02:33 INFO mapred.JobClient: Job Counters > 13/12/17 15:02:33 INFO mapred.JobClient: Launched map tasks=4 > 13/12/17 15:02:33 INFO mapred.JobClient: Data-local map tasks=4 > 13/12/17 15:02:33 INFO mapred.JobClient: Failed map tasks=1 > hduser@localhost:~$ > > > > > > > > > > Regards > Ranjini R > > On Tue, Dec 17, 2013 at 3:20 PM, unmesha sreeveni <[email protected]> > wrote: >> >> Mine is working properly . >> Output >> 13/12/17 15:18:12 WARN util.NativeCodeLoader: Unable to load native-hadoop >> library for your platform... using builtin-java classes where applicable >> 13/12/17 15:18:13 WARN conf.Configuration: session.id is deprecated. >> Instead, use dfs.metrics.session-id >> 13/12/17 15:18:13 INFO jvm.JvmMetrics: Initializing JVM Metrics with >> processName=JobTracker, sessionId= >> 13/12/17 15:18:13 WARN mapred.JobClient: Use GenericOptionsParser for >> parsing the arguments. Applications should implement Tool for the same. >> 13/12/17 15:18:13 WARN mapred.JobClient: No job jar file set. User >> classes may not be found. See JobConf(Class) or JobConf#setJar(String). >> 13/12/17 15:18:13 INFO input.FileInputFormat: Total input paths to process >> : 1 >> 13/12/17 15:18:13 INFO mapred.LocalJobRunner: OutputCommitter set in >> config null >> 13/12/17 15:18:13 INFO mapred.JobClient: Running job: >> job_local2063093851_0001 >> 13/12/17 15:18:13 INFO mapred.LocalJobRunner: OutputCommitter is >> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter >> 13/12/17 15:18:13 INFO mapred.LocalJobRunner: Waiting for map tasks >> 13/12/17 15:18:13 INFO mapred.LocalJobRunner: Starting task: >> attempt_local2063093851_0001_m_000000_0 >> 13/12/17 15:18:13 WARN mapreduce.Counters: Group >> org.apache.hadoop.mapred.Task$Counter is deprecated. Use >> org.apache.hadoop.mapreduce.TaskCounter instead >> 13/12/17 15:18:13 INFO util.ProcessTree: setsid exited with exit code 0 >> 13/12/17 15:18:13 INFO mapred.Task: Using ResourceCalculatorPlugin : >> org.apache.hadoop.util.LinuxResourceCalculatorPlugin@109c4289 >> 13/12/17 15:18:13 INFO mapred.MapTask: Processing split: >> file:/home/sreeveni/myfiles/xml/conf:0+217 >> 13/12/17 15:18:13 INFO mapred.MapTask: Map output collector class = >> org.apache.hadoop.mapred.MapTask$MapOutputBuffer >> 13/12/17 15:18:13 INFO mapred.MapTask: io.sort.mb = 100 >> 13/12/17 15:18:13 INFO mapred.MapTask: data buffer = 79691776/99614720 >> 13/12/17 15:18:13 INFO mapred.MapTask: record buffer = 262144/327680 >> ‘<property> >> <name>dfs.replication</name> >> <value>1</value> >> </property>‘ >> ‘<property> >> <name>dfs</name> >> <value>2</value> >> </property>‘ >> 13/12/17 15:18:13 INFO mapred.LocalJobRunner: >> 13/12/17 15:18:13 INFO mapred.MapTask: Starting flush of map output >> 13/12/17 15:18:13 INFO mapred.MapTask: Finished spill 0 >> 13/12/17 15:18:13 INFO mapred.Task: >> Task:attempt_local2063093851_0001_m_000000_0 is done. And is in the process >> of commiting >> 13/12/17 15:18:13 INFO mapred.LocalJobRunner: >> 13/12/17 15:18:13 INFO mapred.Task: Task >> 'attempt_local2063093851_0001_m_000000_0' done. >> 13/12/17 15:18:13 INFO mapred.LocalJobRunner: Finishing task: >> attempt_local2063093851_0001_m_000000_0 >> 13/12/17 15:18:13 INFO mapred.LocalJobRunner: Map task executor complete. >> 13/12/17 15:18:13 WARN mapreduce.Counters: Group >> org.apache.hadoop.mapred.Task$Counter is deprecated. Use >> org.apache.hadoop.mapreduce.TaskCounter instead >> 13/12/17 15:18:13 INFO mapred.Task: Using ResourceCalculatorPlugin : >> org.apache.hadoop.util.LinuxResourceCalculatorPlugin@1bf54903 >> 13/12/17 15:18:13 INFO mapred.LocalJobRunner: >> 13/12/17 15:18:13 INFO mapred.Merger: Merging 1 sorted segments >> 13/12/17 15:18:13 INFO mapred.Merger: Down to the last merge-pass, with 1 >> segments left of total size: 30 bytes >> 13/12/17 15:18:13 INFO mapred.LocalJobRunner: >> 13/12/17 15:18:13 INFO mapred.Task: >> Task:attempt_local2063093851_0001_r_000000_0 is done. And is in the process >> of commiting >> 13/12/17 15:18:13 INFO mapred.LocalJobRunner: >> 13/12/17 15:18:13 INFO mapred.Task: Task >> attempt_local2063093851_0001_r_000000_0 is allowed to commit now >> 13/12/17 15:18:13 INFO output.FileOutputCommitter: Saved output of task >> 'attempt_local2063093851_0001_r_000000_0' to /home/sreeveni/myfiles/xmlOut >> 13/12/17 15:18:13 INFO mapred.LocalJobRunner: reduce > reduce >> 13/12/17 15:18:13 INFO mapred.Task: Task >> 'attempt_local2063093851_0001_r_000000_0' done. >> 13/12/17 15:18:14 INFO mapred.JobClient: map 100% reduce 100% >> 13/12/17 15:18:14 INFO mapred.JobClient: Job complete: >> job_local2063093851_0001 >> 13/12/17 15:18:14 INFO mapred.JobClient: Counters: 20 >> 13/12/17 15:18:14 INFO mapred.JobClient: File System Counters >> 13/12/17 15:18:14 INFO mapred.JobClient: FILE: Number of bytes >> read=780 >> 13/12/17 15:18:14 INFO mapred.JobClient: FILE: Number of bytes >> written=185261 >> 13/12/17 15:18:14 INFO mapred.JobClient: FILE: Number of read >> operations=0 >> 13/12/17 15:18:14 INFO mapred.JobClient: FILE: Number of large read >> operations=0 >> 13/12/17 15:18:14 INFO mapred.JobClient: FILE: Number of write >> operations=0 >> 13/12/17 15:18:14 INFO mapred.JobClient: Map-Reduce Framework >> 13/12/17 15:18:14 INFO mapred.JobClient: Map input records=2 >> 13/12/17 15:18:14 INFO mapred.JobClient: Map output records=2 >> 13/12/17 15:18:14 INFO mapred.JobClient: Map output bytes=24 >> 13/12/17 15:18:14 INFO mapred.JobClient: Input split bytes=101 >> 13/12/17 15:18:14 INFO mapred.JobClient: Combine input records=0 >> 13/12/17 15:18:14 INFO mapred.JobClient: Combine output records=0 >> 13/12/17 15:18:14 INFO mapred.JobClient: Reduce input groups=2 >> 13/12/17 15:18:14 INFO mapred.JobClient: Reduce shuffle bytes=0 >> 13/12/17 15:18:14 INFO mapred.JobClient: Reduce input records=2 >> 13/12/17 15:18:14 INFO mapred.JobClient: Reduce output records=4 >> 13/12/17 15:18:14 INFO mapred.JobClient: Spilled Records=4 >> 13/12/17 15:18:14 INFO mapred.JobClient: CPU time spent (ms)=0 >> 13/12/17 15:18:14 INFO mapred.JobClient: Physical memory (bytes) >> snapshot=0 >> 13/12/17 15:18:14 INFO mapred.JobClient: Virtual memory (bytes) >> snapshot=0 >> 13/12/17 15:18:14 INFO mapred.JobClient: Total committed heap usage >> (bytes)=446431232 >> >> Can u list ur jar files? >> >> >> On Tue, Dec 17, 2013 at 3:11 PM, unmesha sreeveni <[email protected]> >> wrote: >>> >>> wait let me check out :) >>> >>> >>> On Tue, Dec 17, 2013 at 3:09 PM, Ranjini Rathinam >>> <[email protected]> wrote: >>>> >>>> Hi, >>>> >>>> I am trying to process xml via mapreduce. and output should be in text >>>> format. >>>> >>>> I am using hadoop 0.20 >>>> >>>> the following error has occured , the link provided >>>> >>>> https://github.com/studhadoop/xmlparsing-hadoop/blob/master/XmlParser11.java >>>> >>>> >>>> I have used the Package org.apache.hadoop.mapreduce.lib. only. >>>> >>>> then also following error has occured , please help. >>>> >>>> hduser@localhost:~$ hadoop jar xml.jar ParserDriverMain Ran Sales >>>> 13/12/17 15:02:01 WARN mapred.JobClient: Use GenericOptionsParser for >>>> parsing the arguments. Applications should implement Tool for the same. >>>> 13/12/17 15:02:01 INFO input.FileInputFormat: Total input paths to >>>> process : 1 >>>> 13/12/17 15:02:01 INFO mapred.JobClient: Running job: >>>> job_201312161706_0021 >>>> 13/12/17 15:02:02 INFO mapred.JobClient: map 0% reduce 0% >>>> 13/12/17 15:02:12 INFO mapred.JobClient: Task Id : >>>> attempt_201312161706_0021_m_000000_0, Status : FAILED >>>> Error: Found class org.apache.hadoop.mapreduce.TaskAttemptContext, but >>>> interface was expected >>>> 13/12/17 15:02:18 INFO mapred.JobClient: Task Id : >>>> attempt_201312161706_0021_m_000000_1, Status : FAILED >>>> Error: Found class org.apache.hadoop.mapreduce.TaskAttemptContext, but >>>> interface was expected >>>> 13/12/17 15:02:24 INFO mapred.JobClient: Task Id : >>>> attempt_201312161706_0021_m_000000_2, Status : FAILED >>>> Error: Found class org.apache.hadoop.mapreduce.TaskAttemptContext, but >>>> interface was expected >>>> 13/12/17 15:02:33 INFO mapred.JobClient: Job complete: >>>> job_201312161706_0021 >>>> 13/12/17 15:02:33 INFO mapred.JobClient: Counters: 3 >>>> 13/12/17 15:02:33 INFO mapred.JobClient: Job Counters >>>> 13/12/17 15:02:33 INFO mapred.JobClient: Launched map tasks=4 >>>> 13/12/17 15:02:33 INFO mapred.JobClient: Data-local map tasks=4 >>>> 13/12/17 15:02:33 INFO mapred.JobClient: Failed map tasks=1 >>>> hduser@localhost:~$ >>>> >>>> >>>> >>>> >>>> >>>> thanks in advance. >>>> >>>> Ranjini >>>> >>> >>> >>> >>> >>> -- >>> Thanks & Regards >>> >>> Unmesha Sreeveni U.B >>> Junior Developer >>> >>> >> >> >> >> -- >> Thanks & Regards >> >> Unmesha Sreeveni U.B >> Junior Developer >> >> > >
<Employee> <fname>som</fname> <lname>shekhar</lname> </Employee> <Employee> <fname>som</fname> <mname>sharma</mname> <lname>shekhar</lname> </Employee> <Employee> <fname>som</fname> <mname>sharma</mname> <lname>shekhar</lname> <dob>25081981</dob> </Employee> <Employee> <fname>som</fname> <mname>sharma</mname> <lname>shekhar</lname> <dob>25081981</dob> <address>Bangalore</address> </Employee> <Employee> <fname>som</fname> <mname>sharma</mname> <lname>shekhar</lname> <dob>25081981</dob> <number>8197243810</number> </Employee>
emp_op.tar.gz
Description: GNU Zip compressed data
src.tar XML_INPUT_FORMAT.gz
Description: GNU Zip compressed data
