Author: olga
Date: Tue Nov 20 10:27:05 2007
New Revision: 596765

URL: http://svn.apache.org/viewvc?rev=596765&view=rev
Log:
Changes to make pig work with hadoop 0.15; PIG-17

Added:
    incubator/pig/trunk/lib/hadoop15.jar   (with props)
Modified:
    incubator/pig/trunk/CHANGES.txt
    incubator/pig/trunk/build.xml
    incubator/pig/trunk/src/org/apache/pig/PigServer.java
    incubator/pig/trunk/src/org/apache/pig/impl/PigContext.java
    incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
    
incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java
    
incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/PigInputFormat.java
    incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.jj

Modified: incubator/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=596765&r1=596764&r2=596765&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Tue Nov 20 10:27:05 2007
@@ -19,3 +19,5 @@
        PIG-23 Made pig work with java 1.5. (milindb via gates)
 
        PIG-8 added binary comparator (olgan)
+
+       PIG-17 integrated with Hadoop 0.15 (olgan@)

Modified: incubator/pig/trunk/build.xml
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/build.xml?rev=596765&r1=596764&r2=596765&view=diff
==============================================================================
--- incubator/pig/trunk/build.xml (original)
+++ incubator/pig/trunk/build.xml Tue Nov 20 10:27:05 2007
@@ -10,7 +10,7 @@
        <property name="bzip2.src.dir" value="${basedir}/lib-src/bzip2" />
        <property name="test.src.dir" value="${basedir}/test" />
        <property name="output.jarfile" value="pig.jar" />
-       <property name="hadoop.jarfile" value="hadoop14.jar"/>
+       <property name="hadoop.jarfile" value="hadoop15.jar"/>
        <property name="ssh.gateway" value=""/>
        <property name="hod.server" value=""/>
        <property name="hod.command" value=""/>
@@ -53,12 +53,14 @@
        <target name="compile" depends="cc-compile, lib-compile">
                <mkdir dir="${dist.dir}" />
                <echo>*** Building Main Sources ***</echo>
-               <javac srcdir="${src.dir}" destdir="${dist.dir}" target="1.5" 
debug="on">
+               <javac srcdir="${src.dir};${shock.src.dir};${bzip2.src.dir}" 
destdir="${dist.dir}" target="1.5" debug="on" deprecation="on">
                        <classpath refid="classpath" />
+                       <compilerarg value="-Xlint:unchecked"/>
                </javac>
                <echo>*** Building Test Sources ***</echo>
                <javac srcdir="test" destdir="${dist.dir}" debug="on">
                        <classpath refid="classpath" />
+                       <compilerarg value="-Xlint:unchecked"/>
                </javac>
        </target>
 

Added: incubator/pig/trunk/lib/hadoop15.jar
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/lib/hadoop15.jar?rev=596765&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/pig/trunk/lib/hadoop15.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Modified: incubator/pig/trunk/src/org/apache/pig/PigServer.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/PigServer.java?rev=596765&r1=596764&r2=596765&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/PigServer.java Tue Nov 20 10:27:05 
2007
@@ -27,6 +27,7 @@
 import org.apache.hadoop.dfs.DistributedFileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
@@ -488,7 +489,7 @@
     public long fileSize(String filename) throws IOException {
         FileSystem dfs = pigContext.getDfs();
         Path p = new Path(filename);
-        long len = dfs.getLength(p);
+        long len = dfs.getFileStatus(p).getLen();
         long replication = dfs.getDefaultReplication(); // did not work, for 
some reason: dfs.getReplication(p);
         return len * replication;
     }
@@ -510,10 +511,10 @@
     }
     
     public String[] listPaths(String dir) throws IOException {
-        Path paths[] = pigContext.getDfs().listPaths(new Path(dir));
-        String strPaths[] = new String[paths.length];
-        for (int i = 0; i < paths.length; i++) {
-            strPaths[i] = paths[i].toString();
+        FileStatus stats[] = pigContext.getDfs().listStatus(new Path(dir));
+        String strPaths[] = new String[stats.length];
+        for (int i = 0; i < stats.length; i++) {
+            strPaths[i] = stats[i].getPath().toString();
         }
         return strPaths;
     }

Modified: incubator/pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=596765&r1=596764&r2=596765&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/PigContext.java Tue Nov 20 
10:27:05 2007
@@ -42,6 +42,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapred.JobSubmissionProtocol;
 import org.apache.hadoop.mapred.JobTracker;
@@ -330,7 +331,7 @@
            {
                conf = new JobConf(hadoopConf);
                // make sure that files on class path are used
-               conf.addFinalResource("pig-cluster-hadoop-site.xml");
+               conf.addResource("pig-cluster-hadoop-site.xml");
                System.out.println("Job Conf = " + conf);
                System.out.println("dfs.block.size= " + 
conf.get("dfs.block.size"));
                System.out.println("ipc.client.timeout= " + 
conf.get("ipc.client.timeout"));

Modified: incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java?rev=596765&r1=596764&r2=596765&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Tue Nov 
20 10:27:05 2007
@@ -32,6 +32,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.PigServer.ExecType;
 import org.apache.pig.impl.PigContext;
@@ -158,7 +159,10 @@
        Path paths[] = null;
        if (fs.exists(path)) {
                if (fs.isFile(path)) return fs.open(path);
-               paths = fs.listPaths(path);
+                       FileStatus fileStat[] = fs.listStatus(path);
+                       paths = new Path[fileStat.length];
+                       for (int i = 0; i < fileStat.length; i++)
+                       paths[i] = fileStat[i].getPath();
                } else {
                        // It might be a glob
                        if (!globMatchesFiles(path, paths, fs)) throw new 
IOException(path + " does not exist");

Modified: 
incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java?rev=596765&r1=596764&r2=596765&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java
 (original)
+++ 
incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java
 Tue Nov 20 10:27:05 2007
@@ -39,7 +39,7 @@
 import org.apache.pig.impl.util.ObjectSerializer;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TaskReport;
@@ -158,8 +158,8 @@
                conf.setReducerClass(PigMapReduce.class);
                conf.setInputFormat(PigInputFormat.class);
                conf.setOutputFormat(PigOutputFormat.class);
-               conf.setInputKeyClass(UTF8.class);
-               conf.setInputValueClass(Tuple.class);
+               // not used starting with 0.15 
conf.setInputKeyClass(Text.class);
+               // not used starting with 0.15 
conf.setInputValueClass(Tuple.class);
                conf.setOutputKeyClass(Tuple.class);
                conf.setOutputValueClass(IndexedTuple.class);
                conf.set("pig.inputs", 
ObjectSerializer.serialize(pom.inputFileSpecs));

Modified: 
incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/PigInputFormat.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/PigInputFormat.java?rev=596765&r1=596764&r2=596765&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/PigInputFormat.java 
(original)
+++ 
incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/PigInputFormat.java 
Tue Nov 20 10:27:05 2007
@@ -24,7 +24,8 @@
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -45,7 +46,7 @@
 import org.apache.tools.bzip2r.CBZip2InputStream;
 
 
-public class PigInputFormat implements InputFormat, JobConfigurable {
+public class PigInputFormat implements InputFormat<Text, Tuple>, 
JobConfigurable {
 
     public InputSplit[] getSplits(JobConf job, int numSplits) throws 
IOException {
        
@@ -87,15 +88,15 @@
             //paths.add(path);
             for (int j = 0; j < paths.size(); j++) {
                 Path fullPath = new Path(fs.getWorkingDirectory(), 
paths.get(j));
-                if (fs.isDirectory(fullPath)) {
-                       Path children[] = fs.listPaths(fullPath);
+                if (fs.getFileStatus(fullPath).isDir()) {
+                       FileStatus children[] = fs.listStatus(fullPath);
                        for(int k = 0; k < children.length; k++) {
-                               paths.add(children[k]);
+                               paths.add(children[k].getPath());
                        }
                        continue;
                 }
-                long bs = fs.getBlockSize(fullPath);
-                long size = fs.getLength(fullPath);
+                long bs = fs.getFileStatus(fullPath).getBlockSize();
+                long size = fs.getFileStatus(fullPath).getLen();
                 long pos = 0;
                 String name = paths.get(j).getName();
                 if (name.endsWith(".gz")) {
@@ -114,7 +115,7 @@
         return splits.toArray(new PigSplit[splits.size()]);
     }
 
-   public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter 
reporter) throws IOException {
+   public RecordReader<Text, Tuple> getRecordReader(InputSplit split, JobConf 
job, Reporter reporter) throws IOException {
         PigRecordReader r = new PigRecordReader(job, (PigSplit)split, 
compressionCodecs);
         return r;
     }
@@ -127,7 +128,7 @@
         codecList = conf.get("io.compression.codecs", "none");
     }
     
-    public static class PigRecordReader implements RecordReader {
+    public static class PigRecordReader implements RecordReader<Text, Tuple> {
         /**
          * This is a tremendously ugly hack to get around the fact that 
mappers do not have access
          * to their readers. We take advantage of the fact that 
RecordReader.next and Mapper.map is
@@ -182,15 +183,15 @@
         public JobConf getJobConf(){
                return job;
         }
-        
-        public boolean next(Writable key, Writable value) throws IOException {
+
+        public boolean next(Text key, Tuple value) throws IOException {
             Tuple t = loader.getNext();
             if (t == null) {
                 return false;
             }
 
-            ((UTF8) key).set(split.getPath().getName());
-            ((Tuple)value).copyFrom(t);
+            key.set(split.getPath().getName());
+            value.copyFrom(t);
             return true;
         }
 
@@ -206,11 +207,11 @@
             return split;
         }
 
-        public WritableComparable createKey() {
-            return new UTF8();
+        public Text createKey() {
+            return new Text();
         }
 
-        public Writable createValue() {
+        public Tuple createValue() {
             return new Tuple();
         }
 

Modified: incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.jj
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.jj?rev=596765&r1=596764&r2=596765&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.jj (original)
+++ incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.jj Tue Nov 
20 10:27:05 2007
@@ -37,6 +37,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
 
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.RunningJob; 
@@ -102,13 +103,14 @@
                if (!mDfs.exists(dfsPath))
                        throw new IOException("Directory " + path + " does not 
exist.");
 
-               if (mDfs.isDirectory(dfsPath)) 
+               if (mDfs.getFileStatus(dfsPath).isDir()) 
                {
-                       Path paths[] = mDfs.listPaths(dfsPath);
-                       for (int j = 0; j < paths.length; j++)
+                       FileStatus fileStat[] = mDfs.listStatus(dfsPath);
+                       for (int j = 0; j < fileStat.length; j++)
                        {
-                               if (!mDfs.isFile(paths[j])) continue;
-                               FSDataInputStream is = mDfs.open(paths[j]);
+                               Path curPath = fileStat[j].getPath();
+                               if (!mDfs.isFile(curPath)) continue;
+                               FSDataInputStream is = mDfs.open(curPath);
                                while ((rc = is.read(buffer)) > 0)
                                        System.out.write(buffer, 0, rc);
                                is.close();
@@ -134,7 +136,7 @@
                        if (!mDfs.exists(dfsDir))
                                throw new IOException("Directory " + path + " 
does not exist.");
 
-                       if (!mDfs.isDirectory(dfsDir))
+                       if (!mDfs.getFileStatus(dfsDir).isDir())
                                throw new IOException(path + " is not a 
directory.");
 
                        mDfs.setWorkingDirectory(dfsDir);
@@ -175,14 +177,13 @@
                                throw new IOException("File or directory " + 
path + " does not exist.");
                }
 
-               Path paths[] = mDfs.listPaths(dir);
-               for (int j = 0; j < paths.length; j++)
+               FileStatus fileStat[] = mDfs.listStatus(dir);
+               for (int j = 0; j < fileStat.length; j++)
                {
-                       Path curPath = paths[j];
-                       if (mDfs.isDirectory(curPath))
-                               System.out.println(curPath + "\t<dir>");
+            if (fileStat[j].isDir())
+                       System.out.println(fileStat[j].getPath() + "\t<dir>");
                        else
-                               System.out.println(curPath + "<r " + 
mDfs.getReplication(curPath) + ">\t" + mDfs.getLength(curPath));
+                               System.out.println(fileStat[j].getPath() + "<r 
" + fileStat[j].getReplication() + ">\t" + fileStat[j].getLen());
                 }
        }
 


Reply via email to