Author: rding
Date: Fri Aug 27 18:32:35 2010
New Revision: 990223

URL: http://svn.apache.org/viewvc?rev=990223&view=rev
Log:
PIG-1483: [piggybank] Add HadoopJobHistoryLoader to the piggybank

Added:
    
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HadoopJobHistoryLoader.java
    
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/data/
    
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/data/jh/
    
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/data/jh/localhost_1272395783545_job_201004271216_9998_conf.xml
    
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/data/jh/localhost_1272395783545_job_201004271216_9998_user_PigLatinTest.pig
    
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHadoopJobHistoryLoader.java
Modified:
    hadoop/pig/trunk/CHANGES.txt

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=990223&r1=990222&r2=990223&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Aug 27 18:32:35 2010
@@ -26,6 +26,8 @@ PIG-1249: Safe-guards against misconfigu
 
 IMPROVEMENTS
 
+PIG-1483: [piggybank] Add HadoopJobHistoryLoader to the piggybank (rding)
+
 PIG-1555: [piggybank] add CSV Loader (dvryaboy)
 
 PIG-1518: multi file input format for loaders (yanz via rding)

Added: 
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HadoopJobHistoryLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HadoopJobHistoryLoader.java?rev=990223&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HadoopJobHistoryLoader.java
 (added)
+++ 
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HadoopJobHistoryLoader.java
 Fri Aug 27 18:32:35 2010
@@ -0,0 +1,583 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.piggybank.storage;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.ParseException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Stack;
+
+import javax.xml.parsers.SAXParserFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.DefaultJobHistoryParser;
+import org.apache.hadoop.mapred.JobHistory;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.Counters.Group;
+import org.apache.hadoop.mapred.JobHistory.JobInfo;
+import org.apache.hadoop.mapred.JobHistory.Keys;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.DefaultTupleFactory;
+import org.apache.pig.data.Tuple;
+import org.xml.sax.Attributes;
+import org.xml.sax.SAXException;
+import org.xml.sax.helpers.DefaultHandler;
+
+public class HadoopJobHistoryLoader extends LoadFunc {
+         
+    private static final Log LOG = 
LogFactory.getLog(HadoopJobHistoryLoader.class);
+       
+    private RecordReader<Text, MRJobInfo> reader;
+    
+    public HadoopJobHistoryLoader() {
+    }
+   
+    @SuppressWarnings("unchecked")
+    @Override
+    public InputFormat getInputFormat() throws IOException {     
+        return new HadoopJobHistoryInputFormat();
+    }
+
+    @Override
+    public Tuple getNext() throws IOException {
+        boolean notDone = false;
+        try {
+            notDone = reader.nextKeyValue();
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+        if (!notDone) {
+            return null;
+        }   
+        Tuple t = null;
+        try {
+            MRJobInfo val = (MRJobInfo)reader.getCurrentValue();
+            t = DefaultTupleFactory.getInstance().newTuple(3);
+            t.set(0, val.job);
+            t.set(1, val.mapTask);
+            t.set(2, val.reduceTask);
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+        return t;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void prepareToRead(RecordReader reader, PigSplit split)
+            throws IOException {
+        this.reader = (HadoopJobHistoryReader)reader;
+    }
+
+    @Override
+    public void setLocation(String location, Job job) throws IOException {
+        FileInputFormat.setInputPaths(job, location);
+        FileInputFormat.setInputPathFilter(job, JobHistoryPathFilter.class);
+    }
+    
+    public static class JobHistoryPathFilter implements PathFilter {
+        @Override
+        public boolean accept(Path p) {
+            String name = p.getName(); 
+            return !name.endsWith(".xml");
+        }       
+    }
+    
+    public static class HadoopJobHistoryInputFormat extends
+            FileInputFormat<Text, MRJobInfo> {
+
+        @Override
+        public RecordReader<Text, MRJobInfo> createRecordReader(
+                InputSplit split, TaskAttemptContext context)
+                throws IOException, InterruptedException {
+            return new HadoopJobHistoryReader();
+        }
+
+        @Override
+        protected boolean isSplitable(JobContext context, Path filename) {
+            return false;
+        }         
+    }
+    
+    public static class HadoopJobHistoryReader extends
+            RecordReader<Text, MRJobInfo> {
+
+        private String location;
+        
+        private MRJobInfo value;
+        
+        private Configuration conf;
+                
+        @Override
+        public void close() throws IOException {            
+        }
+
+        @Override
+        public Text getCurrentKey() throws IOException, InterruptedException {
+            return null;
+        }
+
+        @Override
+        public MRJobInfo getCurrentValue() throws IOException,
+                InterruptedException {            
+            return value;
+        }
+
+        @Override
+        public float getProgress() throws IOException, InterruptedException {
+            return 0;
+        }
+
+        @Override
+        public void initialize(InputSplit split, TaskAttemptContext context)
+                throws IOException, InterruptedException {
+            FileSplit fSplit = (FileSplit) split; 
+            Path p = fSplit.getPath();
+            location = p.toString();
+            LOG.info("location: " + location);    
+            conf = context.getConfiguration();
+        }
+
+        @Override
+        public boolean nextKeyValue() throws IOException, InterruptedException 
{
+            if (location != null) {
+                LOG.info("load: " + location);  
+                Path full = new Path(location);  
+                String[] jobDetails = 
+                    
JobInfo.decodeJobHistoryFileName(full.getName()).split("_");
+                String jobId = jobDetails[2] + "_" + jobDetails[3] + "_"
+                        + jobDetails[4];
+                JobHistory.JobInfo job = new JobHistory.JobInfo(jobId); 
+ 
+                value = new MRJobInfo();
+                                            
+                FileSystem fs = full.getFileSystem(conf);
+                FileStatus fstat = fs.getFileStatus(full);
+                
+                LOG.info("file size: " + fstat.getLen());
+                DefaultJobHistoryParser.parseJobTasks(location, job,
+                        full.getFileSystem(conf)); 
+                LOG.info("job history parsed sucessfully");
+                HadoopJobHistoryLoader.parseJobHistory(conf, job, value);
+                LOG.info("get parsed job history");
+                
+                // parse Hadoop job xml file
+                Path parent = full.getParent();
+                String jobXml = jobDetails[0] + "_" + jobDetails[1] + "_"
+                        + jobDetails[2] + "_" + jobDetails[3] + "_"
+                        + jobDetails[4] + "_conf.xml";
+                Path p = new Path(parent, jobXml);  
+             
+                FSDataInputStream fileIn = fs.open(p);
+                Map<String, String> val = HadoopJobHistoryLoader
+                        .parseJobXML(fileIn);
+                for (String key : val.keySet()) {
+                    value.job.put(key, val.get(key));
+                }
+                
+                location = null;
+                return true;
+            }          
+            value = null;
+            return false;
+        }   
+    }
+    
+    //------------------------------------------------------------------------
+        
+    public static class MRJobInfo {
+        public Map<String, String> job;
+        public Map<String, String> mapTask;
+        public Map<String, String> reduceTask;
+        
+        public MRJobInfo() {
+            job = new HashMap<String, String>();
+            mapTask = new HashMap<String, String>();
+            reduceTask = new HashMap<String, String>();
+        }
+    }
+    
+    
//--------------------------------------------------------------------------------------------
+    
+    public static final String TASK_COUNTER_GROUP = 
"org.apache.hadoop.mapred.Task$Counter";
+    public static final String MAP_INPUT_RECORDS = "MAP_INPUT_RECORDS";
+    public static final String REDUCE_INPUT_RECORDS = "REDUCE_INPUT_RECORDS";
+    
+    /**
+     * Job Keys
+     */
+    public static enum JobKeys {
+        JOBTRACKERID, JOBID, JOBNAME, JOBTYPE, USER, SUBMIT_TIME, CONF_PATH, 
LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES,
+        STATUS, FINISH_TIME, FINISHED_MAPS, FINISHED_REDUCES, FAILED_MAPS, 
FAILED_REDUCES, 
+        LAUNCHED_MAPS, LAUNCHED_REDUCES, RACKLOCAL_MAPS, DATALOCAL_MAPS, 
HDFS_BYTES_READ,
+        HDFS_BYTES_WRITTEN, FILE_BYTES_READ, FILE_BYTES_WRITTEN, 
COMBINE_OUTPUT_RECORDS,
+        COMBINE_INPUT_RECORDS, REDUCE_INPUT_GROUPS, REDUCE_INPUT_RECORDS, 
REDUCE_OUTPUT_RECORDS,
+        MAP_INPUT_RECORDS, MAP_OUTPUT_RECORDS, MAP_INPUT_BYTES, 
MAP_OUTPUT_BYTES, MAP_HDFS_BYTES_WRITTEN,
+        JOBCONF, JOB_PRIORITY, SHUFFLE_BYTES, SPILLED_RECORDS
+    }
+    
+    public static void parseJobHistory(Configuration jobConf, JobInfo jobInfo, 
MRJobInfo value) {
+        value.job.clear();
+        populateJob(jobInfo.getValues(), value.job);
+        value.mapTask.clear();
+        value.reduceTask.clear();
+        populateMapReduceTaskLists(value, jobInfo.getAllTasks());
+    }
+    
+    private static void populateJob (Map<JobHistory.Keys, String> jobC, 
Map<String, String> job) {            
+        int size = jobC.size();
+        Iterator<Map.Entry<JobHistory.Keys, String>> kv = 
jobC.entrySet().iterator();
+        for (int i = 0; i < size; i++) {
+            Map.Entry<JobHistory.Keys, String> entry = 
(Map.Entry<JobHistory.Keys, String>) kv.next();
+            JobHistory.Keys key = entry.getKey();
+            String value = entry.getValue();
+            switch (key) {
+            case JOBTRACKERID: job.put(JobKeys.JOBTRACKERID.toString(), 
value); break;           
+            case FINISH_TIME: job.put(JobKeys.FINISH_TIME.toString(), value); 
break;
+            case JOBID: job.put(JobKeys.JOBID.toString(), value); break;
+            case JOBNAME: job.put(JobKeys.JOBNAME.toString(), value); break;
+            case USER: job.put(JobKeys.USER.toString(), value); break;
+            case JOBCONF: job.put(JobKeys.JOBCONF.toString(), value); break;
+            case SUBMIT_TIME: job.put(JobKeys.SUBMIT_TIME.toString(), value); 
break;
+            case LAUNCH_TIME: job.put(JobKeys.LAUNCH_TIME.toString(), value); 
break;
+            case TOTAL_MAPS: job.put(JobKeys.TOTAL_MAPS.toString(), value); 
break;
+            case TOTAL_REDUCES: job.put(JobKeys.TOTAL_REDUCES.toString(), 
value); break;
+            case FAILED_MAPS: job.put(JobKeys.FAILED_MAPS.toString(), value); 
break;
+            case FAILED_REDUCES: job.put(JobKeys.FAILED_REDUCES.toString(), 
value); break;
+            case FINISHED_MAPS: job.put(JobKeys.FINISHED_MAPS.toString(), 
value); break;
+            case FINISHED_REDUCES: 
job.put(JobKeys.FINISHED_REDUCES.toString(), value); break;
+            case JOB_STATUS: job.put(JobKeys.STATUS.toString(), value); break;
+            case COUNTERS:
+                value.concat(",");
+                parseAndAddJobCounters(job, value);
+                break;
+            default: 
+                LOG.debug("JobHistory.Keys."+ key + " : NOT INCLUDED IN LOADER 
RETURN VALUE");
+                break;
+            }
+        }
+    }
+    
+    /*
+     * Parse and add the job counters
+     */
+    @SuppressWarnings("deprecation")
+    private static void parseAndAddJobCounters(Map<String, String> job, String 
counters) {
+        try {
+            Counters counterGroups = 
Counters.fromEscapedCompactString(counters);
+            for (Group otherGroup : counterGroups) {
+                Group group = counterGroups.getGroup(otherGroup.getName());
+                for (Counter otherCounter : otherGroup) {
+                    Counter counter = 
group.getCounterForName(otherCounter.getName());
+                    job.put(otherCounter.getName(), 
String.valueOf(counter.getValue()));
+                }
+            }
+        } catch (ParseException e) {
+           LOG.warn("Failed to parse job counters", e);
+        }
+    } 
+    
+    @SuppressWarnings("deprecation")
+    private static void populateMapReduceTaskLists (MRJobInfo value, 
+            Map<String, JobHistory.Task> taskMap) {
+                
+        Map<String, String> mapT = value.mapTask;
+        Map<String, String> reduceT = value.reduceTask;
+        long minMapRows = Long.MAX_VALUE;
+        long maxMapRows = 0;
+        long minMapTime = Long.MAX_VALUE;
+        long maxMapTime = 0;
+        long avgMapTime = 0;
+        long totalMapTime = 0;
+        int numberMaps = 0;
+        
+        long minReduceRows = Long.MAX_VALUE;
+        long maxReduceRows = 0;        
+        long minReduceTime = Long.MAX_VALUE;
+        long maxReduceTime = 0;
+        long avgReduceTime = 0;
+        long totalReduceTime = 0;
+        int numberReduces = 0;
+       
+        int num_tasks = taskMap.entrySet().size();
+        Iterator<Map.Entry<String, JobHistory.Task>> ti = 
taskMap.entrySet().iterator();
+        for (int i = 0; i < num_tasks; i++) {
+            Map.Entry<String, JobHistory.Task> entry = (Map.Entry<String, 
JobHistory.Task>) ti.next();
+            JobHistory.Task task = entry.getValue();
+            if (task.get(Keys.TASK_TYPE).equals("MAP")) {
+                Map<JobHistory.Keys, String> mapTask = task.getValues();
+                Map<JobHistory.Keys, String> successTaskAttemptMap  =  
getLastSuccessfulTaskAttempt(task);
+                // NOTE: Following would lead to less number of actual tasks 
collected in the tasklist array
+                if (successTaskAttemptMap != null) {
+                    mapTask.putAll(successTaskAttemptMap);
+                } else {
+                    LOG.info("Task:<" + task.get(Keys.TASKID) + "> is not 
successful - SKIPPING");
+                }
+                long duration = 0;
+                long startTime = 0;
+                long endTime = 0;
+                int size = mapTask.size();
+                numberMaps++;
+                Iterator<Map.Entry<JobHistory.Keys, String>> kv = 
mapTask.entrySet().iterator();
+                for (int j = 0; j < size; j++) {
+                    Map.Entry<JobHistory.Keys, String> mtc = kv.next();
+                    JobHistory.Keys key = mtc.getKey();
+                    String val = mtc.getValue();
+                    switch (key) {
+                    case START_TIME: 
+                        startTime = Long.valueOf(val);
+                        break;
+                    case FINISH_TIME:
+                        endTime = Long.valueOf(val);
+                        break;                    
+                    case COUNTERS: {
+                        try {
+                            Counters counters = 
Counters.fromEscapedCompactString(val);
+                            long rows = counters.getGroup(TASK_COUNTER_GROUP)
+                                    
.getCounterForName(MAP_INPUT_RECORDS).getCounter(); 
+                            if (rows < minMapRows) minMapRows = rows;
+                            if (rows > maxMapRows) maxMapRows = rows;
+                        } catch (ParseException e) {
+                            LOG.warn("Failed to parse job counters", e);
+                        }
+                    }
+                    break;
+                    default: 
+                        LOG.warn("JobHistory.Keys." + key 
+                                + " : NOT INCLUDED IN PERFORMANCE ADVISOR MAP 
COUNTERS");
+                        break;
+                    }
+                }
+                duration = endTime - startTime;
+                if (minMapTime > duration) minMapTime = duration;
+                if (maxMapTime < duration) maxMapTime = duration;
+                totalMapTime += duration;        
+            } else if (task.get(Keys.TASK_TYPE).equals("REDUCE")) {
+                Map<JobHistory.Keys, String> reduceTask = task.getValues();
+                Map<JobHistory.Keys, String> successTaskAttemptMap  =  
getLastSuccessfulTaskAttempt(task);
+                // NOTE: Following would lead to less number of actual tasks 
collected in the tasklist array
+                if (successTaskAttemptMap != null) {
+                    reduceTask.putAll(successTaskAttemptMap);
+                } else {
+                    LOG.warn("Task:<" + task.get(Keys.TASKID) + "> is not 
successful - SKIPPING");
+                }
+                long duration = 0;
+                long startTime = 0;
+                long endTime = 0;
+                int size = reduceTask.size();
+                numberReduces++;
+
+                Iterator<Map.Entry<JobHistory.Keys, String>> kv = 
reduceTask.entrySet().iterator();
+                for (int j = 0; j < size; j++) {
+                    Map.Entry<JobHistory.Keys, String> rtc = kv.next();
+                    JobHistory.Keys key = rtc.getKey();
+                    String val = rtc.getValue();
+                    switch (key) {
+                    case START_TIME: 
+                        startTime = Long.valueOf(val);
+                        break;
+                    case FINISH_TIME:
+                        endTime = Long.valueOf(val);
+                        break;
+                    case COUNTERS: {
+                        try {
+                            Counters counters = 
Counters.fromEscapedCompactString(val);
+                            long rows = counters.getGroup(TASK_COUNTER_GROUP)
+                                    
.getCounterForName(REDUCE_INPUT_RECORDS).getCounter(); 
+                            if (rows < minReduceRows) minReduceRows = rows;
+                            if (rows > maxReduceRows) maxReduceRows = rows;
+                        } catch (ParseException e) {
+                            LOG.warn("Failed to parse job counters", e);
+                        }
+                    }
+                    break;
+                    default: 
+                        LOG.warn("JobHistory.Keys." + key 
+                                + " : NOT INCLUDED IN PERFORMANCE ADVISOR 
REDUCE COUNTERS");
+                        break;
+                    }
+                }
+                
+                duration = endTime - startTime;
+                if (minReduceTime > duration) minReduceTime = duration;
+                if (maxReduceTime < duration) maxReduceTime = duration;
+                totalReduceTime += duration;
+
+            } else if (task.get(Keys.TASK_TYPE).equals("CLEANUP")) {
+                LOG.info("IGNORING TASK TYPE : " + task.get(Keys.TASK_TYPE));
+            } else {
+                LOG.warn("UNKNOWN TASK TYPE : " + task.get(Keys.TASK_TYPE));
+            }
+        }
+        if (numberMaps > 0) {
+            avgMapTime = (totalMapTime / numberMaps);
+            mapT.put("MIN_MAP_TIME", String.valueOf(minMapTime));
+            mapT.put("MAX_MAP_TIME", String.valueOf(maxMapTime));
+            mapT.put("MIN_MAP_INPUT_ROWS", String.valueOf(minMapRows));
+            mapT.put("MAX_MAP_INPUT_ROWS", String.valueOf(maxMapRows));
+            mapT.put("AVG_MAP_TIME", String.valueOf(avgMapTime));
+            mapT.put("NUMBER_MAPS", String.valueOf(numberMaps));
+        }
+        if (numberReduces > 0) {
+            avgReduceTime = (totalReduceTime /numberReduces);
+            reduceT.put("MIN_REDUCE_TIME", String.valueOf(minReduceTime));
+            reduceT.put("MAX_REDUCE_TIME", String.valueOf(maxReduceTime));
+            reduceT.put("AVG_REDUCE_TIME", String.valueOf(avgReduceTime));
+            reduceT.put("MIN_REDUCE_INPUT_ROWS", 
String.valueOf(minReduceTime));
+            reduceT.put("MAX_REDUCE_INPUT_ROWS", 
String.valueOf(maxReduceTime));            
+            reduceT.put("NUMBER_REDUCES", String.valueOf(numberReduces));
+        } else {
+            reduceT.put("NUMBER_REDUCES", String.valueOf(0));
+        }
+    }
+    
+    /*
+     * Get last successful task attempt to be added in the stats
+     */
+    private static Map<JobHistory.Keys, String> getLastSuccessfulTaskAttempt(
+            JobHistory.Task task) {
+
+        Map<String, JobHistory.TaskAttempt> taskAttempts = task
+                .getTaskAttempts();
+        int size = taskAttempts.size();
+        Iterator<Map.Entry<String, JobHistory.TaskAttempt>> kv = taskAttempts
+                .entrySet().iterator();
+        for (int i = 0; i < size; i++) {
+            // CHECK_IT: Only one SUCCESSFUL TASK ATTEMPT
+            Map.Entry<String, JobHistory.TaskAttempt> tae = kv.next();
+            JobHistory.TaskAttempt attempt = tae.getValue();
+            if (attempt.getValues().get(JobHistory.Keys.TASK_STATUS).equals(
+                    "SUCCESS")) {
+                return attempt.getValues();
+            }
+        }
+
+        return null;
+    }
+   
+    //-------------------------------------------------------------------------
+    /*
+     * Job xml keys
+     */
+    private static final Map<String, String> XML_KEYS;
+    
+    static {
+        XML_KEYS = new HashMap<String, String> ();       
+        XML_KEYS.put("group.name", "USER_GROUP");
+        XML_KEYS.put("user.name", "USER");
+        XML_KEYS.put("user.dir", "HOST_DIR");
+        XML_KEYS.put("mapred.job.queue.name", "QUEUE_NAME");
+        XML_KEYS.put("cluster", "CLUSTER");
+        XML_KEYS.put("jobName", "JOB_NAME");
+        XML_KEYS.put("pig.script.id", "PIG_SCRIPT_ID");
+        XML_KEYS.put("pig.script", "PIG_SCRIPT");
+        XML_KEYS.put("pig.hadoop.version", "HADOOP_VERSION");
+        XML_KEYS.put("pig.version", "PIG_VERSION");
+        XML_KEYS.put("pig.job.feature", "PIG_JOB_FEATURE");
+        XML_KEYS.put("pig.alias", "PIG_JOB_ALIAS"); 
+        XML_KEYS.put("pig.parent.jobid", "PIG_JOB_PARENTS");
+        XML_KEYS.put("pig.host", "HOST_NAME");        
+    }
+     
+    public static Map<String, String> parseJobXML(InputStream in) {
+        
+        HashMap<String, String> xmlMap = new HashMap<String, String>();
+        
+        try {
+            SAXParserFactory.newInstance().newSAXParser().parse(in,
+                    new JobXMLHandler(xmlMap));
+        } catch (Exception e) {
+            LOG.warn("Failed to parser job xml", e);
+        }
+                          
+        return xmlMap;
+    }
+        
+    private static class JobXMLHandler extends DefaultHandler {
+                  
+        private static final String NAME = "name";
+        private static final String VALUE = "value";
+        
+        private static Stack<String> tags = new Stack<String>();    
+        
+        private static String curTag;
+        
+        private static String key;
+ 
+        private static Map<String, String> xmlMap;
+                
+        public JobXMLHandler(Map<String, String> xml) {
+            xmlMap = xml;
+        }
+        
+        @Override
+        public void startElement(String uri, String localName,
+                String qName, Attributes attributes) throws SAXException {     
       
+            tags.add(qName);
+            curTag = qName;
+        }
+        
+        @Override
+        public void endElement(String uri, String localName,
+                String qName) throws SAXException {
+            String tag = tags.pop();
+            if (tag == null || !tag.equalsIgnoreCase(qName)) {
+                throw new SAXException("Malformatted XML file: " + tag + " : "
+                        + qName);
+            }
+            curTag = null;
+        }
+        
+        public void characters(char[] ch, int start, int length)
+                throws SAXException {
+            if (tags.size() > 1) {
+                String s = new String(ch, start, length); 
+                if (curTag.equalsIgnoreCase(NAME)) {
+                    key = s;
+                }
+                if (curTag.equalsIgnoreCase(VALUE)) {
+                    String displayKey = XML_KEYS.get(key);
+                    if (displayKey != null) {
+                        xmlMap.put(displayKey, s);
+                    }
+                }
+            }
+        }
+    }
+}

Added: 
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/data/jh/localhost_1272395783545_job_201004271216_9998_conf.xml
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/data/jh/localhost_1272395783545_job_201004271216_9998_conf.xml?rev=990223&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/data/jh/localhost_1272395783545_job_201004271216_9998_conf.xml
 (added)
+++ 
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/data/jh/localhost_1272395783545_job_201004271216_9998_conf.xml
 Fri Aug 27 18:32:35 2010
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<property><name>pig.script.features</name><value>512</value></property>
+<property><name>mapred.job.name</name><value>PigLatin:Test.pig</value></property>
+<property><name>user.dir</name><value>/homes/test</value></property>
+<property><name>pig.version</name><value>0.8.0-dev</value></property>
+<property><name>os.arch</name><value>i386</value></property>
+<property><name>user.name</name><value>test</value></property>
+<property><name>pig.alias</name><value>d</value></property>
+<property><name>hadoop.job.history.user.location</name><value>/mapred/history</value></property>
+<property><name>user.country</name><value>US</value></property>
+<property><name>pig.script</name><value>YSA9IGxvYWQgJy91c2VyL3BpZy90ZXN0cy9kYXRhL3NpbmdsZWZpbGUvc3R1ZGVudHRhYjEwayc7YiA9IGxvYWQgJy91c2VyL3BpZy90ZXN0cy9kYXRhL3NpbmdsZWZpbGUvdm90ZXJ0YWIxMGsnO2MgPSBvcmRlciBhIGJ5ICQwO2QgPSBvcmRlciBiIGJ5ICQwO3N0b3JlIGMgaW50byAnL3VzZXIvcGlnL291dC9qaWFueW9uZy4xMjgyMTE3OTMzL01lcmdlSm9pbl8zLm91dC5pbnRlcm1lZGlhdGUxJztzdG9yZSBkIGludG8gJy91c2VyL3BpZy9vdXQvamlhbnlvbmcuMTI4MjExNzkzMy9NZXJnZUpvaW5fMy5vdXQuaW50ZXJtZWRpYXRlMic7ZXhlYztlID0gbG9hZCAnL3VzZXIvcGlnL291dC9qaWFueW9uZy4xMjgyMTE3OTMzL01lcmdlSm9pbl8zLm91dC5pbnRlcm1lZGlhdGUxJztmID0gbG9hZCAnL3VzZXIvcGlnL291dC9qaWFueW9uZy4xMjgyMTE3OTMzL01lcmdlSm9pbl8zLm91dC5pbnRlcm1lZGlhdGUyJztpID0gZmlsdGVyIGYgYnkgJDIgIT0gJ2RlbW9jcmF0JztnID0gam9pbiBlIGJ5ICQwLCBpIGJ5ICQwIHVzaW5nICJtZXJnZSI7c3RvcmUgZyBpbnRvICcvdXNlci9waWcvb3V0L2ppYW55b25nLjEyODIxMTc5MzMvTWVyZ2VKb2luXzMub3V0Jzs=</value></property>
+<property><name>pig.hadoop.version</name><value>0.20.2</value></property>
+<property><name>pig.script.id</name><value>3eb62180-5473-4301-aa22-467bd685d466</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>exectype</name><value>mapreduce</value></property>
+<property><name>user.language</name><value>en</value></property>
+<property><name>pig.command.line</name><value>-logfile 
/out/pigtest/test.1282117933/Test.log 
/out/pigtest/test.1282117933/Test.pig</value></property>
+<property><name>jobName</name><value>PigLatin:Test.pig</value></property>
+<property><name>pig.quantilesFile</name><value>pigsample_25615779_1282127657849</value></property>
+<property><name>debug</name><value>INFO</value></property>
+<property><name>mapred.map.tasks</name><value>1</value></property>
+<property><name>user.home</name><value>/homes/test</value></property>
+<property><name>pig.job.feature</name><value>ORDER_BY</value></property>
+<property><name>pig.reduce.key.type</name><value>50</value></property>
+<property><name>pig.parent.jobid</name><value>job_201004271216_9995</value></property>
+<property><name>group.name</name><value>users</value></property>
+<property><name>opt.multiquery</name><value>true</value></property>
+<property><name>verbose</name><value>false</value></property>
+<property><name>pig.default.replication.factor</name><value>1</value></property>
+<property><name>cluster</name><value>localhost:10000</value></property>
+<property><name>mapred.mapper.new-api</name><value>true</value></property>
+<property><name>user.timezone</name><value>America/Los_Angeles</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>file.encoding</name><value>UTF-8</value></property>
+</configuration>

Added: 
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/data/jh/localhost_1272395783545_job_201004271216_9998_user_PigLatinTest.pig
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/data/jh/localhost_1272395783545_job_201004271216_9998_user_PigLatinTest.pig?rev=990223&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/data/jh/localhost_1272395783545_job_201004271216_9998_user_PigLatinTest.pig
 (added)
+++ 
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/data/jh/localhost_1272395783545_job_201004271216_9998_user_PigLatinTest.pig
 Fri Aug 27 18:32:35 2010
@@ -0,0 +1,22 @@
+Meta VERSION="1" .
+Job JOBID="job_201004271216_9998" JOBNAME="PigLatin:Test\.pig" USER="test" 
SUBMIT_TIME="1282127662662" 
JOBCONF="hdfs://localhost:10000/mapredsystem/hadoop/mapredsystem/job_201004271216_9998/job\.xml"
 .
+Job JOBID="job_201004271216_9998" JOB_PRIORITY="NORMAL" .
+Job JOBID="job_201004271216_9998" LAUNCH_TIME="1282127662963" TOTAL_MAPS="1" 
TOTAL_REDUCES="1" JOB_STATUS="PREP" .
+Task TASKID="task_201004271216_9998_m_000002" TASK_TYPE="SETUP" 
START_TIME="1282127663890" SPLITS="" .
+MapAttempt TASK_TYPE="SETUP" TASKID="task_201004271216_9998_m_000002" 
TASK_ATTEMPT_ID="attempt_201004271216_9998_m_000002_0" 
START_TIME="1282127664437" 
TRACKER_NAME="tracker_wilbur14\.labs\.corp\.sp1\.yahoo\.com:localhost\.localdomain/127\.0\.0\.1:46026"
 HTTP_PORT="60160" .
+MapAttempt TASK_TYPE="SETUP" TASKID="task_201004271216_9998_m_000002" 
TASK_ATTEMPT_ID="attempt_201004271216_9998_m_000002_0" TASK_STATUS="SUCCESS" 
FINISH_TIME="1282127665818" 
HOSTNAME="/default-rack/wilbur14\.labs\.corp\.sp1\.yahoo\.com" 
STATE_STRING="setup" 
COUNTERS="{(org\.apache\.hadoop\.mapred\.Task$Counter)(Map-Reduce 
Framework)[(SPILLED_RECORDS)(Spilled Records)(0)]}" .
+Task TASKID="task_201004271216_9998_m_000002" TASK_TYPE="SETUP" 
TASK_STATUS="SUCCESS" FINISH_TIME="1282127666954" 
COUNTERS="{(org\.apache\.hadoop\.mapred\.Task$Counter)(Map-Reduce 
Framework)[(SPILLED_RECORDS)(Spilled Records)(0)]}" .
+Job JOBID="job_201004271216_9998" JOB_STATUS="RUNNING" .
+Task TASKID="task_201004271216_9998_m_000000" TASK_TYPE="MAP" 
START_TIME="1282127666955" 
SPLITS="/default-rack/wilbur15\.labs\.corp\.sp1\.yahoo\.com,/default-rack/wilbur12\.labs\.corp\.sp1\.yahoo\.com,/default-rack/wilbur10\.labs\.corp\.sp1\.yahoo\.com"
 .
+MapAttempt TASK_TYPE="MAP" TASKID="task_201004271216_9998_m_000000" 
TASK_ATTEMPT_ID="attempt_201004271216_9998_m_000000_0" 
START_TIME="1282127667085" 
TRACKER_NAME="tracker_wilbur14\.labs\.corp\.sp1\.yahoo\.com:localhost\.localdomain/127\.0\.0\.1:46026"
 HTTP_PORT="60160" .
+MapAttempt TASK_TYPE="MAP" TASKID="task_201004271216_9998_m_000000" 
TASK_ATTEMPT_ID="attempt_201004271216_9998_m_000000_0" TASK_STATUS="SUCCESS" 
FINISH_TIME="1282127669505" 
HOSTNAME="/default-rack/wilbur14\.labs\.corp\.sp1\.yahoo\.com" STATE_STRING="" 
COUNTERS="{(FileSystemCounters)(FileSystemCounters)[(FILE_BYTES_READ)(FILE_BYTES_READ)(73)][(HDFS_BYTES_READ)(HDFS_BYTES_READ)(338211)][(FILE_BYTES_WRITTEN)(FILE_BYTES_WRITTEN)(478249)]}{(org\.apache\.hadoop\.mapred\.Task$Counter)(Map-Reduce
 Framework)[(COMBINE_OUTPUT_RECORDS)(Combine output 
records)(0)][(MAP_INPUT_RECORDS)(Map input 
records)(10000)][(SPILLED_RECORDS)(Spilled 
Records)(10000)][(MAP_OUTPUT_BYTES)(Map output 
bytes)(458211)][(COMBINE_INPUT_RECORDS)(Combine input 
records)(0)][(MAP_OUTPUT_RECORDS)(Map output records)(10000)]}" .
+Task TASKID="task_201004271216_9998_m_000000" TASK_TYPE="MAP" 
TASK_STATUS="SUCCESS" FINISH_TIME="1282127669960" 
COUNTERS="{(FileSystemCounters)(FileSystemCounters)[(FILE_BYTES_READ)(FILE_BYTES_READ)(73)][(HDFS_BYTES_READ)(HDFS_BYTES_READ)(338211)][(FILE_BYTES_WRITTEN)(FILE_BYTES_WRITTEN)(478249)]}{(org\.apache\.hadoop\.mapred\.Task$Counter)(Map-Reduce
 Framework)[(COMBINE_OUTPUT_RECORDS)(Combine output 
records)(0)][(MAP_INPUT_RECORDS)(Map input 
records)(10000)][(SPILLED_RECORDS)(Spilled 
Records)(10000)][(MAP_OUTPUT_BYTES)(Map output 
bytes)(458211)][(COMBINE_INPUT_RECORDS)(Combine input 
records)(0)][(MAP_OUTPUT_RECORDS)(Map output records)(10000)]}" .
+Task TASKID="task_201004271216_9998_r_000000" TASK_TYPE="REDUCE" 
START_TIME="1282127669961" SPLITS="" .
+ReduceAttempt TASK_TYPE="REDUCE" TASKID="task_201004271216_9998_r_000000" 
TASK_ATTEMPT_ID="attempt_201004271216_9998_r_000000_0" 
START_TIME="1282127670026" 
TRACKER_NAME="tracker_wilbur14\.labs\.corp\.sp1\.yahoo\.com:localhost\.localdomain/127\.0\.0\.1:46026"
 HTTP_PORT="60160" .
+ReduceAttempt TASK_TYPE="REDUCE" TASKID="task_201004271216_9998_r_000000" 
TASK_ATTEMPT_ID="attempt_201004271216_9998_r_000000_0" TASK_STATUS="SUCCESS" 
SHUFFLE_FINISHED="1282127677253" SORT_FINISHED="1282127677447" 
FINISH_TIME="1282127679044" 
HOSTNAME="/default-rack/wilbur14\.labs\.corp\.sp1\.yahoo\.com" 
STATE_STRING="reduce > reduce" 
COUNTERS="{(FileSystemCounters)(FileSystemCounters)[(FILE_BYTES_READ)(FILE_BYTES_READ)(478217)][(FILE_BYTES_WRITTEN)(FILE_BYTES_WRITTEN)(478217)][(HDFS_BYTES_WRITTEN)(HDFS_BYTES_WRITTEN)(338211)]}{(org\.apache\.hadoop\.mapred\.Task$Counter)(Map-Reduce
 Framework)[(REDUCE_INPUT_GROUPS)(Reduce input 
groups)(676)][(COMBINE_OUTPUT_RECORDS)(Combine output 
records)(0)][(REDUCE_SHUFFLE_BYTES)(Reduce shuffle 
bytes)(0)][(REDUCE_OUTPUT_RECORDS)(Reduce output 
records)(10000)][(SPILLED_RECORDS)(Spilled 
Records)(10000)][(COMBINE_INPUT_RECORDS)(Combine input 
records)(0)][(REDUCE_INPUT_RECORDS)(Reduce input records)(10000)]}" .
+Task TASKID="task_201004271216_9998_r_000000" TASK_TYPE="REDUCE" 
TASK_STATUS="SUCCESS" FINISH_TIME="1282127681977" 
COUNTERS="{(FileSystemCounters)(FileSystemCounters)[(FILE_BYTES_READ)(FILE_BYTES_READ)(478217)][(FILE_BYTES_WRITTEN)(FILE_BYTES_WRITTEN)(478217)][(HDFS_BYTES_WRITTEN)(HDFS_BYTES_WRITTEN)(338211)]}{(org\.apache\.hadoop\.mapred\.Task$Counter)(Map-Reduce
 Framework)[(REDUCE_INPUT_GROUPS)(Reduce input 
groups)(676)][(COMBINE_OUTPUT_RECORDS)(Combine output 
records)(0)][(REDUCE_SHUFFLE_BYTES)(Reduce shuffle 
bytes)(0)][(REDUCE_OUTPUT_RECORDS)(Reduce output 
records)(10000)][(SPILLED_RECORDS)(Spilled 
Records)(10000)][(COMBINE_INPUT_RECORDS)(Combine input 
records)(0)][(REDUCE_INPUT_RECORDS)(Reduce input records)(10000)]}" .
+Task TASKID="task_201004271216_9998_m_000001" TASK_TYPE="CLEANUP" 
START_TIME="1282127681977" SPLITS="" .
+MapAttempt TASK_TYPE="CLEANUP" TASKID="task_201004271216_9998_m_000001" 
TASK_ATTEMPT_ID="attempt_201004271216_9998_m_000001_0" 
START_TIME="1282127682043" 
TRACKER_NAME="tracker_wilbur14\.labs\.corp\.sp1\.yahoo\.com:localhost\.localdomain/127\.0\.0\.1:46026"
 HTTP_PORT="60160" .
+MapAttempt TASK_TYPE="CLEANUP" TASKID="task_201004271216_9998_m_000001" 
TASK_ATTEMPT_ID="attempt_201004271216_9998_m_000001_0" TASK_STATUS="SUCCESS" 
FINISH_TIME="1282127683242" 
HOSTNAME="/default-rack/wilbur14\.labs\.corp\.sp1\.yahoo\.com" 
STATE_STRING="cleanup" 
COUNTERS="{(org\.apache\.hadoop\.mapred\.Task$Counter)(Map-Reduce 
Framework)[(SPILLED_RECORDS)(Spilled Records)(0)]}" .
+Task TASKID="task_201004271216_9998_m_000001" TASK_TYPE="CLEANUP" 
TASK_STATUS="SUCCESS" FINISH_TIME="1282127684981" 
COUNTERS="{(org\.apache\.hadoop\.mapred\.Task$Counter)(Map-Reduce 
Framework)[(SPILLED_RECORDS)(Spilled Records)(0)]}" .
+Job JOBID="job_201004271216_9998" FINISH_TIME="1282127684981" 
JOB_STATUS="SUCCESS" FINISHED_MAPS="1" FINISHED_REDUCES="1" FAILED_MAPS="0" 
FAILED_REDUCES="0" 
COUNTERS="{(org\.apache\.hadoop\.mapred\.JobInProgress$Counter)(Job Counters 
)[(TOTAL_LAUNCHED_REDUCES)(Launched reduce 
tasks)(1)][(RACK_LOCAL_MAPS)(Rack-local map 
tasks)(1)][(TOTAL_LAUNCHED_MAPS)(Launched map 
tasks)(1)]}{(FileSystemCounters)(FileSystemCounters)[(FILE_BYTES_READ)(FILE_BYTES_READ)(478290)][(HDFS_BYTES_READ)(HDFS_BYTES_READ)(338211)][(FILE_BYTES_WRITTEN)(FILE_BYTES_WRITTEN)(956466)][(HDFS_BYTES_WRITTEN)(HDFS_BYTES_WRITTEN)(338211)]}{(org\.apache\.hadoop\.mapred\.Task$Counter)(Map-Reduce
 Framework)[(REDUCE_INPUT_GROUPS)(Reduce input 
groups)(676)][(COMBINE_OUTPUT_RECORDS)(Combine output 
records)(0)][(MAP_INPUT_RECORDS)(Map input 
records)(10000)][(REDUCE_SHUFFLE_BYTES)(Reduce shuffle 
bytes)(0)][(REDUCE_OUTPUT_RECORDS)(Reduce output 
records)(10000)][(SPILLED_RECORDS)(Spilled Records)(20000)][(MAP_OUTPUT_BYTES)
 (Map output bytes)(458211)][(MAP_OUTPUT_RECORDS)(Map output 
records)(10000)][(COMBINE_INPUT_RECORDS)(Combine input 
records)(0)][(REDUCE_INPUT_RECORDS)(Reduce input records)(10000)]}" .

Added: 
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHadoopJobHistoryLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHadoopJobHistoryLoader.java?rev=990223&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHadoopJobHistoryLoader.java
 (added)
+++ 
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHadoopJobHistoryLoader.java
 Fri Aug 27 18:32:35 2010
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.piggybank.test.storage;
+
+
+import static org.junit.Assert.*;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestHadoopJobHistoryLoader {
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+    }
+
+    private static final String INPUT_DIR = 
+        "src/test/java/org/apache/pig/piggybank/test/data/jh";
+    
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testHadoopJHLoader() throws Exception {
+        PigServer pig = new PigServer(ExecType.LOCAL);
+        pig.registerQuery("a = load '" + INPUT_DIR 
+                + "' using 
org.apache.pig.piggybank.storage.HadoopJobHistoryLoader() " 
+                + "as (j:map[], m:map[], r:map[]);");
+        Iterator<Tuple> iter = pig.openIterator("a");
+        
+        assertTrue(iter.hasNext());
+        
+        Tuple t = iter.next();
+        
+        Map<String, Object> job = (Map<String, Object>)t.get(0);
+        
+        assertEquals("3eb62180-5473-4301-aa22-467bd685d466", 
(String)job.get("PIG_SCRIPT_ID"));
+        assertEquals("job_201004271216_9998", (String)job.get("JOBID"));
+        assertEquals("job_201004271216_9995", 
(String)job.get("PIG_JOB_PARENTS"));
+        assertEquals("0.8.0-dev", (String)job.get("PIG_VERSION"));
+        assertEquals("0.20.2", (String)job.get("HADOOP_VERSION"));
+        assertEquals("d", (String)job.get("PIG_JOB_ALIAS"));
+        assertEquals("PigLatin:Test.pig", job.get("JOBNAME"));
+        assertEquals("ORDER_BY", (String)job.get("PIG_JOB_FEATURE"));
+        assertEquals("1", (String)job.get("TOTAL_MAPS"));
+        assertEquals("1", (String)job.get("TOTAL_REDUCES"));              
+    }
+}


Reply via email to