Author: rding Date: Fri Aug 27 18:32:35 2010 New Revision: 990223 URL: http://svn.apache.org/viewvc?rev=990223&view=rev Log: PIG-1483: [piggybank] Add HadoopJobHistoryLoader to the piggybank
Added: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HadoopJobHistoryLoader.java hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/data/ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/data/jh/ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/data/jh/localhost_1272395783545_job_201004271216_9998_conf.xml hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/data/jh/localhost_1272395783545_job_201004271216_9998_user_PigLatinTest.pig hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHadoopJobHistoryLoader.java Modified: hadoop/pig/trunk/CHANGES.txt Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=990223&r1=990222&r2=990223&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Fri Aug 27 18:32:35 2010 @@ -26,6 +26,8 @@ PIG-1249: Safe-guards against misconfigu IMPROVEMENTS +PIG-1483: [piggybank] Add HadoopJobHistoryLoader to the piggybank (rding) + PIG-1555: [piggybank] add CSV Loader (dvryaboy) PIG-1518: multi file input format for loaders (yanz via rding) Added: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HadoopJobHistoryLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HadoopJobHistoryLoader.java?rev=990223&view=auto ============================================================================== --- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HadoopJobHistoryLoader.java (added) +++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HadoopJobHistoryLoader.java Fri Aug 27 18:32:35 2010 @@ -0,0 +1,583 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.piggybank.storage; + +import java.io.IOException; +import java.io.InputStream; +import java.text.ParseException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Stack; + +import javax.xml.parsers.SAXParserFactory; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.DefaultJobHistoryParser; +import org.apache.hadoop.mapred.JobHistory; +import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.mapred.Counters.Group; +import org.apache.hadoop.mapred.JobHistory.JobInfo; +import org.apache.hadoop.mapred.JobHistory.Keys; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.pig.LoadFunc; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; +import org.apache.pig.data.DefaultTupleFactory; +import org.apache.pig.data.Tuple; +import org.xml.sax.Attributes; +import org.xml.sax.SAXException; +import org.xml.sax.helpers.DefaultHandler; + +public class HadoopJobHistoryLoader extends LoadFunc { + + private static final Log LOG = LogFactory.getLog(HadoopJobHistoryLoader.class); + + private RecordReader<Text, MRJobInfo> reader; + + public HadoopJobHistoryLoader() { + } + + @SuppressWarnings("unchecked") + @Override + public InputFormat getInputFormat() throws IOException { + return new HadoopJobHistoryInputFormat(); + } + + @Override + public Tuple getNext() throws IOException { + boolean notDone = false; + try { + notDone = reader.nextKeyValue(); + } catch (InterruptedException e) { + throw new IOException(e); + } + if (!notDone) { + return null; + } + Tuple t = null; + try { + MRJobInfo val = (MRJobInfo)reader.getCurrentValue(); + t = DefaultTupleFactory.getInstance().newTuple(3); + t.set(0, val.job); + t.set(1, val.mapTask); + t.set(2, val.reduceTask); + } catch (InterruptedException e) { + throw new IOException(e); + } + return t; + } + + @SuppressWarnings("unchecked") + @Override + public void prepareToRead(RecordReader reader, PigSplit split) + throws IOException { + this.reader = (HadoopJobHistoryReader)reader; + } + + @Override + public void setLocation(String location, Job job) throws IOException { + FileInputFormat.setInputPaths(job, location); + FileInputFormat.setInputPathFilter(job, JobHistoryPathFilter.class); + } + + public static class JobHistoryPathFilter implements PathFilter { + @Override + public boolean accept(Path p) { + String name = p.getName(); + return !name.endsWith(".xml"); + } + } + + public static class HadoopJobHistoryInputFormat extends + FileInputFormat<Text, MRJobInfo> { + + @Override + public RecordReader<Text, MRJobInfo> createRecordReader( + InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new HadoopJobHistoryReader(); + } + + @Override + protected boolean isSplitable(JobContext context, Path filename) { + return false; + } + } + + public static class HadoopJobHistoryReader extends + RecordReader<Text, MRJobInfo> { + + private String location; + + private MRJobInfo value; + + private Configuration conf; + + @Override + public void close() throws IOException { + } + + @Override + public Text getCurrentKey() throws IOException, InterruptedException { + return null; + } + + @Override + public MRJobInfo getCurrentValue() throws IOException, + InterruptedException { + return value; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return 0; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + FileSplit fSplit = (FileSplit) split; + Path p = fSplit.getPath(); + location = p.toString(); + LOG.info("location: " + location); + conf = context.getConfiguration(); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (location != null) { + LOG.info("load: " + location); + Path full = new Path(location); + String[] jobDetails = + JobInfo.decodeJobHistoryFileName(full.getName()).split("_"); + String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + + jobDetails[4]; + JobHistory.JobInfo job = new JobHistory.JobInfo(jobId); + + value = new MRJobInfo(); + + FileSystem fs = full.getFileSystem(conf); + FileStatus fstat = fs.getFileStatus(full); + + LOG.info("file size: " + fstat.getLen()); + DefaultJobHistoryParser.parseJobTasks(location, job, + full.getFileSystem(conf)); + LOG.info("job history parsed sucessfully"); + HadoopJobHistoryLoader.parseJobHistory(conf, job, value); + LOG.info("get parsed job history"); + + // parse Hadoop job xml file + Path parent = full.getParent(); + String jobXml = jobDetails[0] + "_" + jobDetails[1] + "_" + + jobDetails[2] + "_" + jobDetails[3] + "_" + + jobDetails[4] + "_conf.xml"; + Path p = new Path(parent, jobXml); + + FSDataInputStream fileIn = fs.open(p); + Map<String, String> val = HadoopJobHistoryLoader + .parseJobXML(fileIn); + for (String key : val.keySet()) { + value.job.put(key, val.get(key)); + } + + location = null; + return true; + } + value = null; + return false; + } + } + + //------------------------------------------------------------------------ + + public static class MRJobInfo { + public Map<String, String> job; + public Map<String, String> mapTask; + public Map<String, String> reduceTask; + + public MRJobInfo() { + job = new HashMap<String, String>(); + mapTask = new HashMap<String, String>(); + reduceTask = new HashMap<String, String>(); + } + } + + //-------------------------------------------------------------------------------------------- + + public static final String TASK_COUNTER_GROUP = "org.apache.hadoop.mapred.Task$Counter"; + public static final String MAP_INPUT_RECORDS = "MAP_INPUT_RECORDS"; + public static final String REDUCE_INPUT_RECORDS = "REDUCE_INPUT_RECORDS"; + + /** + * Job Keys + */ + public static enum JobKeys { + JOBTRACKERID, JOBID, JOBNAME, JOBTYPE, USER, SUBMIT_TIME, CONF_PATH, LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES, + STATUS, FINISH_TIME, FINISHED_MAPS, FINISHED_REDUCES, FAILED_MAPS, FAILED_REDUCES, + LAUNCHED_MAPS, LAUNCHED_REDUCES, RACKLOCAL_MAPS, DATALOCAL_MAPS, HDFS_BYTES_READ, + HDFS_BYTES_WRITTEN, FILE_BYTES_READ, FILE_BYTES_WRITTEN, COMBINE_OUTPUT_RECORDS, + COMBINE_INPUT_RECORDS, REDUCE_INPUT_GROUPS, REDUCE_INPUT_RECORDS, REDUCE_OUTPUT_RECORDS, + MAP_INPUT_RECORDS, MAP_OUTPUT_RECORDS, MAP_INPUT_BYTES, MAP_OUTPUT_BYTES, MAP_HDFS_BYTES_WRITTEN, + JOBCONF, JOB_PRIORITY, SHUFFLE_BYTES, SPILLED_RECORDS + } + + public static void parseJobHistory(Configuration jobConf, JobInfo jobInfo, MRJobInfo value) { + value.job.clear(); + populateJob(jobInfo.getValues(), value.job); + value.mapTask.clear(); + value.reduceTask.clear(); + populateMapReduceTaskLists(value, jobInfo.getAllTasks()); + } + + private static void populateJob (Map<JobHistory.Keys, String> jobC, Map<String, String> job) { + int size = jobC.size(); + Iterator<Map.Entry<JobHistory.Keys, String>> kv = jobC.entrySet().iterator(); + for (int i = 0; i < size; i++) { + Map.Entry<JobHistory.Keys, String> entry = (Map.Entry<JobHistory.Keys, String>) kv.next(); + JobHistory.Keys key = entry.getKey(); + String value = entry.getValue(); + switch (key) { + case JOBTRACKERID: job.put(JobKeys.JOBTRACKERID.toString(), value); break; + case FINISH_TIME: job.put(JobKeys.FINISH_TIME.toString(), value); break; + case JOBID: job.put(JobKeys.JOBID.toString(), value); break; + case JOBNAME: job.put(JobKeys.JOBNAME.toString(), value); break; + case USER: job.put(JobKeys.USER.toString(), value); break; + case JOBCONF: job.put(JobKeys.JOBCONF.toString(), value); break; + case SUBMIT_TIME: job.put(JobKeys.SUBMIT_TIME.toString(), value); break; + case LAUNCH_TIME: job.put(JobKeys.LAUNCH_TIME.toString(), value); break; + case TOTAL_MAPS: job.put(JobKeys.TOTAL_MAPS.toString(), value); break; + case TOTAL_REDUCES: job.put(JobKeys.TOTAL_REDUCES.toString(), value); break; + case FAILED_MAPS: job.put(JobKeys.FAILED_MAPS.toString(), value); break; + case FAILED_REDUCES: job.put(JobKeys.FAILED_REDUCES.toString(), value); break; + case FINISHED_MAPS: job.put(JobKeys.FINISHED_MAPS.toString(), value); break; + case FINISHED_REDUCES: job.put(JobKeys.FINISHED_REDUCES.toString(), value); break; + case JOB_STATUS: job.put(JobKeys.STATUS.toString(), value); break; + case COUNTERS: + value.concat(","); + parseAndAddJobCounters(job, value); + break; + default: + LOG.debug("JobHistory.Keys."+ key + " : NOT INCLUDED IN LOADER RETURN VALUE"); + break; + } + } + } + + /* + * Parse and add the job counters + */ + @SuppressWarnings("deprecation") + private static void parseAndAddJobCounters(Map<String, String> job, String counters) { + try { + Counters counterGroups = Counters.fromEscapedCompactString(counters); + for (Group otherGroup : counterGroups) { + Group group = counterGroups.getGroup(otherGroup.getName()); + for (Counter otherCounter : otherGroup) { + Counter counter = group.getCounterForName(otherCounter.getName()); + job.put(otherCounter.getName(), String.valueOf(counter.getValue())); + } + } + } catch (ParseException e) { + LOG.warn("Failed to parse job counters", e); + } + } + + @SuppressWarnings("deprecation") + private static void populateMapReduceTaskLists (MRJobInfo value, + Map<String, JobHistory.Task> taskMap) { + + Map<String, String> mapT = value.mapTask; + Map<String, String> reduceT = value.reduceTask; + long minMapRows = Long.MAX_VALUE; + long maxMapRows = 0; + long minMapTime = Long.MAX_VALUE; + long maxMapTime = 0; + long avgMapTime = 0; + long totalMapTime = 0; + int numberMaps = 0; + + long minReduceRows = Long.MAX_VALUE; + long maxReduceRows = 0; + long minReduceTime = Long.MAX_VALUE; + long maxReduceTime = 0; + long avgReduceTime = 0; + long totalReduceTime = 0; + int numberReduces = 0; + + int num_tasks = taskMap.entrySet().size(); + Iterator<Map.Entry<String, JobHistory.Task>> ti = taskMap.entrySet().iterator(); + for (int i = 0; i < num_tasks; i++) { + Map.Entry<String, JobHistory.Task> entry = (Map.Entry<String, JobHistory.Task>) ti.next(); + JobHistory.Task task = entry.getValue(); + if (task.get(Keys.TASK_TYPE).equals("MAP")) { + Map<JobHistory.Keys, String> mapTask = task.getValues(); + Map<JobHistory.Keys, String> successTaskAttemptMap = getLastSuccessfulTaskAttempt(task); + // NOTE: Following would lead to less number of actual tasks collected in the tasklist array + if (successTaskAttemptMap != null) { + mapTask.putAll(successTaskAttemptMap); + } else { + LOG.info("Task:<" + task.get(Keys.TASKID) + "> is not successful - SKIPPING"); + } + long duration = 0; + long startTime = 0; + long endTime = 0; + int size = mapTask.size(); + numberMaps++; + Iterator<Map.Entry<JobHistory.Keys, String>> kv = mapTask.entrySet().iterator(); + for (int j = 0; j < size; j++) { + Map.Entry<JobHistory.Keys, String> mtc = kv.next(); + JobHistory.Keys key = mtc.getKey(); + String val = mtc.getValue(); + switch (key) { + case START_TIME: + startTime = Long.valueOf(val); + break; + case FINISH_TIME: + endTime = Long.valueOf(val); + break; + case COUNTERS: { + try { + Counters counters = Counters.fromEscapedCompactString(val); + long rows = counters.getGroup(TASK_COUNTER_GROUP) + .getCounterForName(MAP_INPUT_RECORDS).getCounter(); + if (rows < minMapRows) minMapRows = rows; + if (rows > maxMapRows) maxMapRows = rows; + } catch (ParseException e) { + LOG.warn("Failed to parse job counters", e); + } + } + break; + default: + LOG.warn("JobHistory.Keys." + key + + " : NOT INCLUDED IN PERFORMANCE ADVISOR MAP COUNTERS"); + break; + } + } + duration = endTime - startTime; + if (minMapTime > duration) minMapTime = duration; + if (maxMapTime < duration) maxMapTime = duration; + totalMapTime += duration; + } else if (task.get(Keys.TASK_TYPE).equals("REDUCE")) { + Map<JobHistory.Keys, String> reduceTask = task.getValues(); + Map<JobHistory.Keys, String> successTaskAttemptMap = getLastSuccessfulTaskAttempt(task); + // NOTE: Following would lead to less number of actual tasks collected in the tasklist array + if (successTaskAttemptMap != null) { + reduceTask.putAll(successTaskAttemptMap); + } else { + LOG.warn("Task:<" + task.get(Keys.TASKID) + "> is not successful - SKIPPING"); + } + long duration = 0; + long startTime = 0; + long endTime = 0; + int size = reduceTask.size(); + numberReduces++; + + Iterator<Map.Entry<JobHistory.Keys, String>> kv = reduceTask.entrySet().iterator(); + for (int j = 0; j < size; j++) { + Map.Entry<JobHistory.Keys, String> rtc = kv.next(); + JobHistory.Keys key = rtc.getKey(); + String val = rtc.getValue(); + switch (key) { + case START_TIME: + startTime = Long.valueOf(val); + break; + case FINISH_TIME: + endTime = Long.valueOf(val); + break; + case COUNTERS: { + try { + Counters counters = Counters.fromEscapedCompactString(val); + long rows = counters.getGroup(TASK_COUNTER_GROUP) + .getCounterForName(REDUCE_INPUT_RECORDS).getCounter(); + if (rows < minReduceRows) minReduceRows = rows; + if (rows > maxReduceRows) maxReduceRows = rows; + } catch (ParseException e) { + LOG.warn("Failed to parse job counters", e); + } + } + break; + default: + LOG.warn("JobHistory.Keys." + key + + " : NOT INCLUDED IN PERFORMANCE ADVISOR REDUCE COUNTERS"); + break; + } + } + + duration = endTime - startTime; + if (minReduceTime > duration) minReduceTime = duration; + if (maxReduceTime < duration) maxReduceTime = duration; + totalReduceTime += duration; + + } else if (task.get(Keys.TASK_TYPE).equals("CLEANUP")) { + LOG.info("IGNORING TASK TYPE : " + task.get(Keys.TASK_TYPE)); + } else { + LOG.warn("UNKNOWN TASK TYPE : " + task.get(Keys.TASK_TYPE)); + } + } + if (numberMaps > 0) { + avgMapTime = (totalMapTime / numberMaps); + mapT.put("MIN_MAP_TIME", String.valueOf(minMapTime)); + mapT.put("MAX_MAP_TIME", String.valueOf(maxMapTime)); + mapT.put("MIN_MAP_INPUT_ROWS", String.valueOf(minMapRows)); + mapT.put("MAX_MAP_INPUT_ROWS", String.valueOf(maxMapRows)); + mapT.put("AVG_MAP_TIME", String.valueOf(avgMapTime)); + mapT.put("NUMBER_MAPS", String.valueOf(numberMaps)); + } + if (numberReduces > 0) { + avgReduceTime = (totalReduceTime /numberReduces); + reduceT.put("MIN_REDUCE_TIME", String.valueOf(minReduceTime)); + reduceT.put("MAX_REDUCE_TIME", String.valueOf(maxReduceTime)); + reduceT.put("AVG_REDUCE_TIME", String.valueOf(avgReduceTime)); + reduceT.put("MIN_REDUCE_INPUT_ROWS", String.valueOf(minReduceTime)); + reduceT.put("MAX_REDUCE_INPUT_ROWS", String.valueOf(maxReduceTime)); + reduceT.put("NUMBER_REDUCES", String.valueOf(numberReduces)); + } else { + reduceT.put("NUMBER_REDUCES", String.valueOf(0)); + } + } + + /* + * Get last successful task attempt to be added in the stats + */ + private static Map<JobHistory.Keys, String> getLastSuccessfulTaskAttempt( + JobHistory.Task task) { + + Map<String, JobHistory.TaskAttempt> taskAttempts = task + .getTaskAttempts(); + int size = taskAttempts.size(); + Iterator<Map.Entry<String, JobHistory.TaskAttempt>> kv = taskAttempts + .entrySet().iterator(); + for (int i = 0; i < size; i++) { + // CHECK_IT: Only one SUCCESSFUL TASK ATTEMPT + Map.Entry<String, JobHistory.TaskAttempt> tae = kv.next(); + JobHistory.TaskAttempt attempt = tae.getValue(); + if (attempt.getValues().get(JobHistory.Keys.TASK_STATUS).equals( + "SUCCESS")) { + return attempt.getValues(); + } + } + + return null; + } + + //------------------------------------------------------------------------- + /* + * Job xml keys + */ + private static final Map<String, String> XML_KEYS; + + static { + XML_KEYS = new HashMap<String, String> (); + XML_KEYS.put("group.name", "USER_GROUP"); + XML_KEYS.put("user.name", "USER"); + XML_KEYS.put("user.dir", "HOST_DIR"); + XML_KEYS.put("mapred.job.queue.name", "QUEUE_NAME"); + XML_KEYS.put("cluster", "CLUSTER"); + XML_KEYS.put("jobName", "JOB_NAME"); + XML_KEYS.put("pig.script.id", "PIG_SCRIPT_ID"); + XML_KEYS.put("pig.script", "PIG_SCRIPT"); + XML_KEYS.put("pig.hadoop.version", "HADOOP_VERSION"); + XML_KEYS.put("pig.version", "PIG_VERSION"); + XML_KEYS.put("pig.job.feature", "PIG_JOB_FEATURE"); + XML_KEYS.put("pig.alias", "PIG_JOB_ALIAS"); + XML_KEYS.put("pig.parent.jobid", "PIG_JOB_PARENTS"); + XML_KEYS.put("pig.host", "HOST_NAME"); + } + + public static Map<String, String> parseJobXML(InputStream in) { + + HashMap<String, String> xmlMap = new HashMap<String, String>(); + + try { + SAXParserFactory.newInstance().newSAXParser().parse(in, + new JobXMLHandler(xmlMap)); + } catch (Exception e) { + LOG.warn("Failed to parser job xml", e); + } + + return xmlMap; + } + + private static class JobXMLHandler extends DefaultHandler { + + private static final String NAME = "name"; + private static final String VALUE = "value"; + + private static Stack<String> tags = new Stack<String>(); + + private static String curTag; + + private static String key; + + private static Map<String, String> xmlMap; + + public JobXMLHandler(Map<String, String> xml) { + xmlMap = xml; + } + + @Override + public void startElement(String uri, String localName, + String qName, Attributes attributes) throws SAXException { + tags.add(qName); + curTag = qName; + } + + @Override + public void endElement(String uri, String localName, + String qName) throws SAXException { + String tag = tags.pop(); + if (tag == null || !tag.equalsIgnoreCase(qName)) { + throw new SAXException("Malformatted XML file: " + tag + " : " + + qName); + } + curTag = null; + } + + public void characters(char[] ch, int start, int length) + throws SAXException { + if (tags.size() > 1) { + String s = new String(ch, start, length); + if (curTag.equalsIgnoreCase(NAME)) { + key = s; + } + if (curTag.equalsIgnoreCase(VALUE)) { + String displayKey = XML_KEYS.get(key); + if (displayKey != null) { + xmlMap.put(displayKey, s); + } + } + } + } + } +} Added: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/data/jh/localhost_1272395783545_job_201004271216_9998_conf.xml URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/data/jh/localhost_1272395783545_job_201004271216_9998_conf.xml?rev=990223&view=auto ============================================================================== --- hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/data/jh/localhost_1272395783545_job_201004271216_9998_conf.xml (added) +++ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/data/jh/localhost_1272395783545_job_201004271216_9998_conf.xml Fri Aug 27 18:32:35 2010 @@ -0,0 +1,36 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration> +<property><name>pig.script.features</name><value>512</value></property> +<property><name>mapred.job.name</name><value>PigLatin:Test.pig</value></property> +<property><name>user.dir</name><value>/homes/test</value></property> +<property><name>pig.version</name><value>0.8.0-dev</value></property> +<property><name>os.arch</name><value>i386</value></property> +<property><name>user.name</name><value>test</value></property> +<property><name>pig.alias</name><value>d</value></property> +<property><name>hadoop.job.history.user.location</name><value>/mapred/history</value></property> +<property><name>user.country</name><value>US</value></property> +<property><name>pig.script</name><value>YSA9IGxvYWQgJy91c2VyL3BpZy90ZXN0cy9kYXRhL3NpbmdsZWZpbGUvc3R1ZGVudHRhYjEwayc7YiA9IGxvYWQgJy91c2VyL3BpZy90ZXN0cy9kYXRhL3NpbmdsZWZpbGUvdm90ZXJ0YWIxMGsnO2MgPSBvcmRlciBhIGJ5ICQwO2QgPSBvcmRlciBiIGJ5ICQwO3N0b3JlIGMgaW50byAnL3VzZXIvcGlnL291dC9qaWFueW9uZy4xMjgyMTE3OTMzL01lcmdlSm9pbl8zLm91dC5pbnRlcm1lZGlhdGUxJztzdG9yZSBkIGludG8gJy91c2VyL3BpZy9vdXQvamlhbnlvbmcuMTI4MjExNzkzMy9NZXJnZUpvaW5fMy5vdXQuaW50ZXJtZWRpYXRlMic7ZXhlYztlID0gbG9hZCAnL3VzZXIvcGlnL291dC9qaWFueW9uZy4xMjgyMTE3OTMzL01lcmdlSm9pbl8zLm91dC5pbnRlcm1lZGlhdGUxJztmID0gbG9hZCAnL3VzZXIvcGlnL291dC9qaWFueW9uZy4xMjgyMTE3OTMzL01lcmdlSm9pbl8zLm91dC5pbnRlcm1lZGlhdGUyJztpID0gZmlsdGVyIGYgYnkgJDIgIT0gJ2RlbW9jcmF0JztnID0gam9pbiBlIGJ5ICQwLCBpIGJ5ICQwIHVzaW5nICJtZXJnZSI7c3RvcmUgZyBpbnRvICcvdXNlci9waWcvb3V0L2ppYW55b25nLjEyODIxMTc5MzMvTWVyZ2VKb2luXzMub3V0Jzs=</value></property> +<property><name>pig.hadoop.version</name><value>0.20.2</value></property> +<property><name>pig.script.id</name><value>3eb62180-5473-4301-aa22-467bd685d466</value></property> +<property><name>mapred.reduce.tasks</name><value>1</value></property> +<property><name>mapred.queue.names</name><value>default</value></property> +<property><name>exectype</name><value>mapreduce</value></property> +<property><name>user.language</name><value>en</value></property> +<property><name>pig.command.line</name><value>-logfile /out/pigtest/test.1282117933/Test.log /out/pigtest/test.1282117933/Test.pig</value></property> +<property><name>jobName</name><value>PigLatin:Test.pig</value></property> +<property><name>pig.quantilesFile</name><value>pigsample_25615779_1282127657849</value></property> +<property><name>debug</name><value>INFO</value></property> +<property><name>mapred.map.tasks</name><value>1</value></property> +<property><name>user.home</name><value>/homes/test</value></property> +<property><name>pig.job.feature</name><value>ORDER_BY</value></property> +<property><name>pig.reduce.key.type</name><value>50</value></property> +<property><name>pig.parent.jobid</name><value>job_201004271216_9995</value></property> +<property><name>group.name</name><value>users</value></property> +<property><name>opt.multiquery</name><value>true</value></property> +<property><name>verbose</name><value>false</value></property> +<property><name>pig.default.replication.factor</name><value>1</value></property> +<property><name>cluster</name><value>localhost:10000</value></property> +<property><name>mapred.mapper.new-api</name><value>true</value></property> +<property><name>user.timezone</name><value>America/Los_Angeles</value></property> +<property><name>mapred.reduce.max.attempts</name><value>4</value></property> +<property><name>file.encoding</name><value>UTF-8</value></property> +</configuration> Added: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/data/jh/localhost_1272395783545_job_201004271216_9998_user_PigLatinTest.pig URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/data/jh/localhost_1272395783545_job_201004271216_9998_user_PigLatinTest.pig?rev=990223&view=auto ============================================================================== --- hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/data/jh/localhost_1272395783545_job_201004271216_9998_user_PigLatinTest.pig (added) +++ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/data/jh/localhost_1272395783545_job_201004271216_9998_user_PigLatinTest.pig Fri Aug 27 18:32:35 2010 @@ -0,0 +1,22 @@ +Meta VERSION="1" . +Job JOBID="job_201004271216_9998" JOBNAME="PigLatin:Test\.pig" USER="test" SUBMIT_TIME="1282127662662" JOBCONF="hdfs://localhost:10000/mapredsystem/hadoop/mapredsystem/job_201004271216_9998/job\.xml" . +Job JOBID="job_201004271216_9998" JOB_PRIORITY="NORMAL" . +Job JOBID="job_201004271216_9998" LAUNCH_TIME="1282127662963" TOTAL_MAPS="1" TOTAL_REDUCES="1" JOB_STATUS="PREP" . +Task TASKID="task_201004271216_9998_m_000002" TASK_TYPE="SETUP" START_TIME="1282127663890" SPLITS="" . +MapAttempt TASK_TYPE="SETUP" TASKID="task_201004271216_9998_m_000002" TASK_ATTEMPT_ID="attempt_201004271216_9998_m_000002_0" START_TIME="1282127664437" TRACKER_NAME="tracker_wilbur14\.labs\.corp\.sp1\.yahoo\.com:localhost\.localdomain/127\.0\.0\.1:46026" HTTP_PORT="60160" . +MapAttempt TASK_TYPE="SETUP" TASKID="task_201004271216_9998_m_000002" TASK_ATTEMPT_ID="attempt_201004271216_9998_m_000002_0" TASK_STATUS="SUCCESS" FINISH_TIME="1282127665818" HOSTNAME="/default-rack/wilbur14\.labs\.corp\.sp1\.yahoo\.com" STATE_STRING="setup" COUNTERS="{(org\.apache\.hadoop\.mapred\.Task$Counter)(Map-Reduce Framework)[(SPILLED_RECORDS)(Spilled Records)(0)]}" . +Task TASKID="task_201004271216_9998_m_000002" TASK_TYPE="SETUP" TASK_STATUS="SUCCESS" FINISH_TIME="1282127666954" COUNTERS="{(org\.apache\.hadoop\.mapred\.Task$Counter)(Map-Reduce Framework)[(SPILLED_RECORDS)(Spilled Records)(0)]}" . +Job JOBID="job_201004271216_9998" JOB_STATUS="RUNNING" . +Task TASKID="task_201004271216_9998_m_000000" TASK_TYPE="MAP" START_TIME="1282127666955" SPLITS="/default-rack/wilbur15\.labs\.corp\.sp1\.yahoo\.com,/default-rack/wilbur12\.labs\.corp\.sp1\.yahoo\.com,/default-rack/wilbur10\.labs\.corp\.sp1\.yahoo\.com" . +MapAttempt TASK_TYPE="MAP" TASKID="task_201004271216_9998_m_000000" TASK_ATTEMPT_ID="attempt_201004271216_9998_m_000000_0" START_TIME="1282127667085" TRACKER_NAME="tracker_wilbur14\.labs\.corp\.sp1\.yahoo\.com:localhost\.localdomain/127\.0\.0\.1:46026" HTTP_PORT="60160" . +MapAttempt TASK_TYPE="MAP" TASKID="task_201004271216_9998_m_000000" TASK_ATTEMPT_ID="attempt_201004271216_9998_m_000000_0" TASK_STATUS="SUCCESS" FINISH_TIME="1282127669505" HOSTNAME="/default-rack/wilbur14\.labs\.corp\.sp1\.yahoo\.com" STATE_STRING="" COUNTERS="{(FileSystemCounters)(FileSystemCounters)[(FILE_BYTES_READ)(FILE_BYTES_READ)(73)][(HDFS_BYTES_READ)(HDFS_BYTES_READ)(338211)][(FILE_BYTES_WRITTEN)(FILE_BYTES_WRITTEN)(478249)]}{(org\.apache\.hadoop\.mapred\.Task$Counter)(Map-Reduce Framework)[(COMBINE_OUTPUT_RECORDS)(Combine output records)(0)][(MAP_INPUT_RECORDS)(Map input records)(10000)][(SPILLED_RECORDS)(Spilled Records)(10000)][(MAP_OUTPUT_BYTES)(Map output bytes)(458211)][(COMBINE_INPUT_RECORDS)(Combine input records)(0)][(MAP_OUTPUT_RECORDS)(Map output records)(10000)]}" . +Task TASKID="task_201004271216_9998_m_000000" TASK_TYPE="MAP" TASK_STATUS="SUCCESS" FINISH_TIME="1282127669960" COUNTERS="{(FileSystemCounters)(FileSystemCounters)[(FILE_BYTES_READ)(FILE_BYTES_READ)(73)][(HDFS_BYTES_READ)(HDFS_BYTES_READ)(338211)][(FILE_BYTES_WRITTEN)(FILE_BYTES_WRITTEN)(478249)]}{(org\.apache\.hadoop\.mapred\.Task$Counter)(Map-Reduce Framework)[(COMBINE_OUTPUT_RECORDS)(Combine output records)(0)][(MAP_INPUT_RECORDS)(Map input records)(10000)][(SPILLED_RECORDS)(Spilled Records)(10000)][(MAP_OUTPUT_BYTES)(Map output bytes)(458211)][(COMBINE_INPUT_RECORDS)(Combine input records)(0)][(MAP_OUTPUT_RECORDS)(Map output records)(10000)]}" . +Task TASKID="task_201004271216_9998_r_000000" TASK_TYPE="REDUCE" START_TIME="1282127669961" SPLITS="" . +ReduceAttempt TASK_TYPE="REDUCE" TASKID="task_201004271216_9998_r_000000" TASK_ATTEMPT_ID="attempt_201004271216_9998_r_000000_0" START_TIME="1282127670026" TRACKER_NAME="tracker_wilbur14\.labs\.corp\.sp1\.yahoo\.com:localhost\.localdomain/127\.0\.0\.1:46026" HTTP_PORT="60160" . +ReduceAttempt TASK_TYPE="REDUCE" TASKID="task_201004271216_9998_r_000000" TASK_ATTEMPT_ID="attempt_201004271216_9998_r_000000_0" TASK_STATUS="SUCCESS" SHUFFLE_FINISHED="1282127677253" SORT_FINISHED="1282127677447" FINISH_TIME="1282127679044" HOSTNAME="/default-rack/wilbur14\.labs\.corp\.sp1\.yahoo\.com" STATE_STRING="reduce > reduce" COUNTERS="{(FileSystemCounters)(FileSystemCounters)[(FILE_BYTES_READ)(FILE_BYTES_READ)(478217)][(FILE_BYTES_WRITTEN)(FILE_BYTES_WRITTEN)(478217)][(HDFS_BYTES_WRITTEN)(HDFS_BYTES_WRITTEN)(338211)]}{(org\.apache\.hadoop\.mapred\.Task$Counter)(Map-Reduce Framework)[(REDUCE_INPUT_GROUPS)(Reduce input groups)(676)][(COMBINE_OUTPUT_RECORDS)(Combine output records)(0)][(REDUCE_SHUFFLE_BYTES)(Reduce shuffle bytes)(0)][(REDUCE_OUTPUT_RECORDS)(Reduce output records)(10000)][(SPILLED_RECORDS)(Spilled Records)(10000)][(COMBINE_INPUT_RECORDS)(Combine input records)(0)][(REDUCE_INPUT_RECORDS)(Reduce input records)(10000)]}" . +Task TASKID="task_201004271216_9998_r_000000" TASK_TYPE="REDUCE" TASK_STATUS="SUCCESS" FINISH_TIME="1282127681977" COUNTERS="{(FileSystemCounters)(FileSystemCounters)[(FILE_BYTES_READ)(FILE_BYTES_READ)(478217)][(FILE_BYTES_WRITTEN)(FILE_BYTES_WRITTEN)(478217)][(HDFS_BYTES_WRITTEN)(HDFS_BYTES_WRITTEN)(338211)]}{(org\.apache\.hadoop\.mapred\.Task$Counter)(Map-Reduce Framework)[(REDUCE_INPUT_GROUPS)(Reduce input groups)(676)][(COMBINE_OUTPUT_RECORDS)(Combine output records)(0)][(REDUCE_SHUFFLE_BYTES)(Reduce shuffle bytes)(0)][(REDUCE_OUTPUT_RECORDS)(Reduce output records)(10000)][(SPILLED_RECORDS)(Spilled Records)(10000)][(COMBINE_INPUT_RECORDS)(Combine input records)(0)][(REDUCE_INPUT_RECORDS)(Reduce input records)(10000)]}" . +Task TASKID="task_201004271216_9998_m_000001" TASK_TYPE="CLEANUP" START_TIME="1282127681977" SPLITS="" . +MapAttempt TASK_TYPE="CLEANUP" TASKID="task_201004271216_9998_m_000001" TASK_ATTEMPT_ID="attempt_201004271216_9998_m_000001_0" START_TIME="1282127682043" TRACKER_NAME="tracker_wilbur14\.labs\.corp\.sp1\.yahoo\.com:localhost\.localdomain/127\.0\.0\.1:46026" HTTP_PORT="60160" . +MapAttempt TASK_TYPE="CLEANUP" TASKID="task_201004271216_9998_m_000001" TASK_ATTEMPT_ID="attempt_201004271216_9998_m_000001_0" TASK_STATUS="SUCCESS" FINISH_TIME="1282127683242" HOSTNAME="/default-rack/wilbur14\.labs\.corp\.sp1\.yahoo\.com" STATE_STRING="cleanup" COUNTERS="{(org\.apache\.hadoop\.mapred\.Task$Counter)(Map-Reduce Framework)[(SPILLED_RECORDS)(Spilled Records)(0)]}" . +Task TASKID="task_201004271216_9998_m_000001" TASK_TYPE="CLEANUP" TASK_STATUS="SUCCESS" FINISH_TIME="1282127684981" COUNTERS="{(org\.apache\.hadoop\.mapred\.Task$Counter)(Map-Reduce Framework)[(SPILLED_RECORDS)(Spilled Records)(0)]}" . +Job JOBID="job_201004271216_9998" FINISH_TIME="1282127684981" JOB_STATUS="SUCCESS" FINISHED_MAPS="1" FINISHED_REDUCES="1" FAILED_MAPS="0" FAILED_REDUCES="0" COUNTERS="{(org\.apache\.hadoop\.mapred\.JobInProgress$Counter)(Job Counters )[(TOTAL_LAUNCHED_REDUCES)(Launched reduce tasks)(1)][(RACK_LOCAL_MAPS)(Rack-local map tasks)(1)][(TOTAL_LAUNCHED_MAPS)(Launched map tasks)(1)]}{(FileSystemCounters)(FileSystemCounters)[(FILE_BYTES_READ)(FILE_BYTES_READ)(478290)][(HDFS_BYTES_READ)(HDFS_BYTES_READ)(338211)][(FILE_BYTES_WRITTEN)(FILE_BYTES_WRITTEN)(956466)][(HDFS_BYTES_WRITTEN)(HDFS_BYTES_WRITTEN)(338211)]}{(org\.apache\.hadoop\.mapred\.Task$Counter)(Map-Reduce Framework)[(REDUCE_INPUT_GROUPS)(Reduce input groups)(676)][(COMBINE_OUTPUT_RECORDS)(Combine output records)(0)][(MAP_INPUT_RECORDS)(Map input records)(10000)][(REDUCE_SHUFFLE_BYTES)(Reduce shuffle bytes)(0)][(REDUCE_OUTPUT_RECORDS)(Reduce output records)(10000)][(SPILLED_RECORDS)(Spilled Records)(20000)][(MAP_OUTPUT_BYTES) (Map output bytes)(458211)][(MAP_OUTPUT_RECORDS)(Map output records)(10000)][(COMBINE_INPUT_RECORDS)(Combine input records)(0)][(REDUCE_INPUT_RECORDS)(Reduce input records)(10000)]}" . Added: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHadoopJobHistoryLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHadoopJobHistoryLoader.java?rev=990223&view=auto ============================================================================== --- hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHadoopJobHistoryLoader.java (added) +++ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHadoopJobHistoryLoader.java Fri Aug 27 18:32:35 2010 @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.piggybank.test.storage; + + +import static org.junit.Assert.*; + +import java.util.Iterator; +import java.util.Map; + +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.data.Tuple; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestHadoopJobHistoryLoader { + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + } + + private static final String INPUT_DIR = + "src/test/java/org/apache/pig/piggybank/test/data/jh"; + + @SuppressWarnings("unchecked") + @Test + public void testHadoopJHLoader() throws Exception { + PigServer pig = new PigServer(ExecType.LOCAL); + pig.registerQuery("a = load '" + INPUT_DIR + + "' using org.apache.pig.piggybank.storage.HadoopJobHistoryLoader() " + + "as (j:map[], m:map[], r:map[]);"); + Iterator<Tuple> iter = pig.openIterator("a"); + + assertTrue(iter.hasNext()); + + Tuple t = iter.next(); + + Map<String, Object> job = (Map<String, Object>)t.get(0); + + assertEquals("3eb62180-5473-4301-aa22-467bd685d466", (String)job.get("PIG_SCRIPT_ID")); + assertEquals("job_201004271216_9998", (String)job.get("JOBID")); + assertEquals("job_201004271216_9995", (String)job.get("PIG_JOB_PARENTS")); + assertEquals("0.8.0-dev", (String)job.get("PIG_VERSION")); + assertEquals("0.20.2", (String)job.get("HADOOP_VERSION")); + assertEquals("d", (String)job.get("PIG_JOB_ALIAS")); + assertEquals("PigLatin:Test.pig", job.get("JOBNAME")); + assertEquals("ORDER_BY", (String)job.get("PIG_JOB_FEATURE")); + assertEquals("1", (String)job.get("TOTAL_MAPS")); + assertEquals("1", (String)job.get("TOTAL_REDUCES")); + } +}