Author: olga Date: Thu May 22 12:36:29 2008 New Revision: 659223 URL: http://svn.apache.org/viewvc?rev=659223&view=rev Log: PIG-198: integration with hadoop 0.17
Added: incubator/pig/trunk/lib/hadoop17.jar (with props) Removed: incubator/pig/trunk/lib/hadoop15.jar Modified: incubator/pig/trunk/CHANGES.txt incubator/pig/trunk/build.xml incubator/pig/trunk/src/org/apache/pig/backend/datastorage/ElementDescriptor.java incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigOutputFormat.java incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SortPartitioner.java incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalPath.java incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Modified: incubator/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=659223&r1=659222&r2=659223&view=diff ============================================================================== --- incubator/pig/trunk/CHANGES.txt (original) +++ incubator/pig/trunk/CHANGES.txt Thu May 22 12:36:29 2008 @@ -295,3 +295,5 @@ PIG-237: validation of the output directory (pi_song via olgan) PIG-236: Fix properties so that values specified via the command line (-D) are not ignored (pkamath via gates). + + PIG-198: integration with hadoop 17 Modified: incubator/pig/trunk/build.xml URL: http://svn.apache.org/viewvc/incubator/pig/trunk/build.xml?rev=659223&r1=659222&r2=659223&view=diff ============================================================================== --- incubator/pig/trunk/build.xml (original) +++ incubator/pig/trunk/build.xml Thu May 22 12:36:29 2008 @@ -42,7 +42,7 @@ <property name="dist.dir" value="${build.dir}/${final.name}" /> <property name="build.encoding" value="ISO-8859-1" /> <!-- TODO with only one version of hadoop in the lib folder we do not need that anymore --> - <property name="hadoop.jarfile" value="hadoop16.jar" /> + <property name="hadoop.jarfile" value="hadoop17.jar" /> <!-- jar names. TODO we might want to use the svn reversion name in the name in case it is a dev version --> <property name="output.jarfile" value="${build.dir}/${final.name}.jar" /> Added: incubator/pig/trunk/lib/hadoop17.jar URL: http://svn.apache.org/viewvc/incubator/pig/trunk/lib/hadoop17.jar?rev=659223&view=auto ============================================================================== Binary file - no diff available. Propchange: incubator/pig/trunk/lib/hadoop17.jar ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Modified: incubator/pig/trunk/src/org/apache/pig/backend/datastorage/ElementDescriptor.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/datastorage/ElementDescriptor.java?rev=659223&r1=659222&r2=659223&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/datastorage/ElementDescriptor.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/datastorage/ElementDescriptor.java Thu May 22 12:36:29 2008 @@ -130,7 +130,14 @@ */ public void updateConfiguration(Properties newConfig) throws IOException; - + + /** + * Defines whether the element is visible to users or + * contains system's metadata + * @return true if this is system file; false otherwise + */ + public boolean systemElement(); + /** * List entity statistics * @return DataStorageProperties Modified: incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java?rev=659223&r1=659222&r2=659223&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java Thu May 22 12:36:29 2008 @@ -66,6 +66,10 @@ for (int j = 0; j < paths.size(); j++) { ElementDescriptor fullPath = store.asElement(store .getActiveContainer(), paths.get(j)); + // Skip hadoop's private/meta files ... + if (fullPath.systemElement()) { + continue; + } if (fullPath instanceof ContainerDescriptor) { for (ElementDescriptor child : ((ContainerDescriptor) fullPath)) { paths.add(child); Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java?rev=659223&r1=659222&r2=659223&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java Thu May 22 12:36:29 2008 @@ -2,13 +2,16 @@ import java.io.IOException; import java.net.URI; +import java.util.ArrayList; import java.util.Enumeration; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.dfs.DistributedFileSystem; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.pig.backend.datastorage.ContainerDescriptor; @@ -183,15 +186,21 @@ public HPath[] asCollection(String pattern) throws DataStorageException { try { - Path[] paths = this.fs.globPaths(new Path(pattern)); - - HPath[] hpaths = new HPath[paths.length]; - - for (int i = 0; i < paths.length; ++i) { - hpaths[i] = ((HPath) this.asElement(paths[i].toString())); - } - - return hpaths; + FileStatus[] paths = this.fs.globStatus(new Path(pattern)); + + if (paths == null) + return new HPath[0]; + + List<HPath> hpaths = new ArrayList<HPath>(); + + for (int i = 0; i < paths.length; ++i) { + HPath hpath = (HPath)this.asElement(paths[i].getPath().toString()); + if (!hpath.systemElement()) { + hpaths.add(hpath); + } + } + + return hpaths.toArray(new HPath[hpaths.size()]); } catch (IOException e) { throw new DataStorageException("Failed to obtain glob for " + pattern, e); Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java?rev=659223&r1=659222&r2=659223&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java Thu May 22 12:36:29 2008 @@ -151,6 +151,12 @@ public FileSystem getHFS() { return fs.getHFS(); } + + public boolean systemElement() { + return (path != null && + (path.getName().startsWith("_") || + path.getName().startsWith("."))); + } @Override public String toString() { Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=659223&r1=659222&r2=659223&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Thu May 22 12:36:29 2008 @@ -11,6 +11,8 @@ import java.net.Socket; import java.net.SocketException; import java.net.SocketImplFactory; +import java.net.URI; +import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.Collection; import java.util.Enumeration; @@ -576,16 +578,33 @@ } private String fixUpDomain(String hostPort,Properties properties) throws UnknownHostException { - String parts[] = hostPort.split(":"); - if (parts[0].indexOf('.') == -1) { + URI uri = null; + try { + uri = new URI(hostPort); + } catch (URISyntaxException use) { + throw new RuntimeException("Illegal hostPort: " + hostPort); + } + + String hostname = uri.getHost(); + int port = uri.getPort(); + + // Parse manually if hostPort wasn't non-opaque URI + // e.g. hostPort is "myhost:myport" + if (hostname == null || port == -1) { + String parts[] = hostPort.split(":"); + hostname = parts[0]; + port = Integer.valueOf(parts[1]); + } + + if (hostname.indexOf('.') == -1) { //jz: fallback to systemproperty cause this not handled in Main String domain = properties.getProperty("cluster.domain",System.getProperty("cluster.domain")); if (domain == null) throw new RuntimeException("Missing cluster.domain property!"); - parts[0] = parts[0] + "." + domain; + hostname = hostname + "." + domain; } - InetAddress.getByName(parts[0]); - return parts[0] + ":" + parts[1]; + InetAddress.getByName(hostname); + return hostname + ":" + Integer.toString(port); } // create temp dir to store hod output; removed on exit Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java?rev=659223&r1=659222&r2=659223&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java Thu May 22 12:36:29 2008 @@ -40,7 +40,8 @@ import org.apache.pig.impl.util.ObjectSerializer; -public class PigCombine implements Reducer { +public class PigCombine implements + Reducer<Tuple, IndexedTuple, Tuple, IndexedTuple> { private final Log log = LogFactory.getLog(getClass()); @@ -53,7 +54,8 @@ private PigContext pigContext; private EvalSpec esp; - public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) + public void reduce(Tuple key, Iterator<IndexedTuple> values, + OutputCollector<Tuple, IndexedTuple> output, Reporter reporter) throws IOException { try { @@ -70,8 +72,8 @@ index = PigInputFormat.getActiveSplit().getIndex(); - Datum groupName = ((Tuple) key).getField(0); - finalout.group = ((Tuple) key); + Datum groupName = key.getField(0); + finalout.group = key; finalout.index = index; Tuple t = new Tuple(1 + inputCount); @@ -82,7 +84,7 @@ } while (values.hasNext()) { - IndexedTuple it = (IndexedTuple) values.next(); + IndexedTuple it = values.next(); t.getBagField(it.index + 1).add(it.toTuple()); } for (int i = 0; i < inputCount; i++) { // XXX: shouldn't we only do this if INNER flag is set? @@ -124,8 +126,9 @@ public void close() throws IOException { } - private static class CombineDataOutputCollector extends DataCollector { - OutputCollector oc = null; + private static class CombineDataOutputCollector + extends DataCollector { + OutputCollector<Tuple, IndexedTuple> oc = null; Tuple group = null; int index = -1; Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java?rev=659223&r1=659222&r2=659223&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java Thu May 22 12:36:29 2008 @@ -77,7 +77,8 @@ * * @author breed */ -public class PigMapReduce implements MapRunnable, Reducer { +public class PigMapReduce implements MapRunnable<WritableComparable, Tuple, WritableComparable, Writable>, + Reducer<Tuple, IndexedTuple, WritableComparable, Writable> { private final Log log = LogFactory.getLog(getClass()); @@ -100,7 +101,8 @@ * the tuples from our PigRecordReader (see ugly ThreadLocal hack), pipe the tuples through the * function pipeline and then close the writer. */ - public void run(RecordReader input, OutputCollector output, Reporter reporter) throws IOException { +public void run(RecordReader<WritableComparable, Tuple> input, + OutputCollector<WritableComparable, Writable> output, Reporter reporter) throws IOException { PigMapReduce.reporter = reporter; oc = output; @@ -110,9 +112,9 @@ // allocate key & value instances that are re-used for all entries WritableComparable key = input.createKey(); - Writable value = input.createValue(); + Tuple value = input.createValue(); while (input.next(key, value)) { - evalPipe.add((Tuple) value); + evalPipe.add(value); } } finally { try { @@ -130,7 +132,7 @@ } } - public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) + public void reduce(Tuple key, Iterator<IndexedTuple> values, OutputCollector<WritableComparable, Writable> output, Reporter reporter) throws IOException { PigMapReduce.reporter = reporter; @@ -142,7 +144,7 @@ } DataBag[] bags = new DataBag[inputCount]; - Datum groupName = ((Tuple) key).getField(0); + Datum groupName = key.getField(0); Tuple t = new Tuple(1 + inputCount); t.setField(0, groupName); for (int i = 1; i < 1 + inputCount; i++) { @@ -151,7 +153,7 @@ } while (values.hasNext()) { - IndexedTuple it = (IndexedTuple) values.next(); + IndexedTuple it = values.next(); t.getBagField(it.index + 1).add(it.toTuple()); } @@ -254,8 +256,8 @@ if (splitSpec == null){ pigWriter = (PigRecordWriter) job.getOutputFormat().getRecordWriter(FileSystem.get(job), job, fileName, reporter); - oc = new OutputCollector() { - public void collect(WritableComparable key, Writable value) throws IOException { + oc = new OutputCollector<WritableComparable, Tuple>() { + public void collect(WritableComparable key, Tuple value) throws IOException { pigWriter.write(key, value); } }; @@ -280,15 +282,14 @@ for (String name: splitSpec.tempFiles){ sideFileWriters.add( outputFormat.getRecordWriter(FileSystem.get(job), job, new Path(name), "split-" + getTaskId(), reporter)); } - return new OutputCollector(){ - public void collect(WritableComparable key, Writable value) throws IOException { - Tuple t = (Tuple) value; + return new OutputCollector<WritableComparable, Tuple>(){ + public void collect(WritableComparable key, Tuple value) throws IOException { ArrayList<Cond> conditions = splitSpec.conditions; for (int i=0; i< conditions.size(); i++){ Cond cond = conditions.get(i); - if (cond.eval(t)){ + if (cond.eval(value)){ //System.out.println("Writing " + t + " to condition " + cond); - sideFileWriters.get(i).write(null, t); + sideFileWriters.get(i).write(null, value); } } Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigOutputFormat.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigOutputFormat.java?rev=659223&r1=659222&r2=659223&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigOutputFormat.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigOutputFormat.java Thu May 22 12:36:29 2008 @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.mapred.RecordWriter; @@ -43,7 +44,7 @@ public RecordWriter getRecordWriter(FileSystem fs, JobConf job, String name, Progressable progress) throws IOException { - Path outputDir = job.getOutputPath(); + Path outputDir = FileOutputFormat.getOutputPath(job); return getRecordWriter(fs, job, outputDir, name, progress); } @@ -56,12 +57,8 @@ } else { store = (StoreFunc) PigContext.instantiateFuncFromSpec(storeFunc); } - // The outputDir is going to be a temporary name we need to look at the - // real thing! - // XXX This is a wretched implementation dependent hack! Need to find - // out from the Hadoop guys if there is a better way. - String parentName = job.getOutputPath().getParent().getParent() - .getName(); + + String parentName = FileOutputFormat.getOutputPath(job).getName(); int suffixStart = parentName.lastIndexOf('.'); if (suffixStart != -1) { String suffix = parentName.substring(suffixStart); @@ -77,7 +74,7 @@ return; } - static public class PigRecordWriter implements RecordWriter { + static public class PigRecordWriter implements RecordWriter<WritableComparable, Tuple> { private OutputStream os = null; private StoreFunc sfunc = null; @@ -98,8 +95,8 @@ * @see org.apache.hadoop.mapred.RecordWriter#write(org.apache.hadoop.io.WritableComparable, * org.apache.hadoop.io.Writable) */ - public void write(WritableComparable key, Writable value) throws IOException { - this.sfunc.putNext((Tuple) value); + public void write(WritableComparable key, Tuple value) throws IOException { + this.sfunc.putNext(value); } public void close(Reporter reporter) throws IOException { Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SortPartitioner.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SortPartitioner.java?rev=659223&r1=659222&r2=659223&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SortPartitioner.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SortPartitioner.java Thu May 22 12:36:29 2008 @@ -23,8 +23,7 @@ import java.util.Arrays; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Partitioner; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; @@ -33,17 +32,15 @@ import org.apache.pig.impl.io.BufferedPositionedInputStream; import org.apache.pig.impl.io.FileLocalizer; - -public class SortPartitioner implements Partitioner { +public class SortPartitioner implements Partitioner<Tuple, Writable> { Tuple[] quantiles; - WritableComparator comparator; - - public int getPartition(WritableComparable key, Writable value, - int numPartitions) { - Tuple keyTuple = (Tuple)key; - int index = Arrays.binarySearch(quantiles, keyTuple.getTupleField(0), comparator); - if (index < 0) - index = -index-1; + RawComparator comparator; + + public int getPartition(Tuple key, Writable value, + int numPartitions) { + int index = Arrays.binarySearch(quantiles, key.getTupleField(0), comparator); + if (index < 0) + index = -index-1; return Math.min(index, numPartitions - 1); } Modified: incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalPath.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalPath.java?rev=659223&r1=659222&r2=659223&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalPath.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalPath.java Thu May 22 12:36:29 2008 @@ -136,6 +136,10 @@ public int compareTo(ElementDescriptor other) { return this.path.compareTo(((LocalPath)other).path); } + + public boolean systemElement(){ + return false; + } public String toString() { return this.path.toString(); Modified: incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java?rev=659223&r1=659222&r2=659223&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Thu May 22 12:36:29 2008 @@ -166,6 +166,8 @@ if (elem.exists()) { try { if(! elem.getDataStorage().isContainer(elem.toString())) { + if (elem.systemElement()) + throw new IOException ("Attempt is made to open system file " + elem.toString()); return elem.open(); } } @@ -183,7 +185,9 @@ ((ContainerDescriptor)elem).iterator(); while (allElements.hasNext()) { - arrayList.add(allElements.next()); + ElementDescriptor nextElement = allElements.next(); + if (!nextElement.systemElement()) + arrayList.add(nextElement); } elements = new ElementDescriptor[ arrayList.size() ];