Author: rding
Date: Sat Aug 21 00:39:05 2010
New Revision: 987679

URL: http://svn.apache.org/viewvc?rev=987679&view=rev
Log:
PIG-1505: support jars and scripts in dfs

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/Main.java
    hadoop/pig/trunk/src/org/apache/pig/PigServer.java
    hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
    hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=987679&r1=987678&r2=987679&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Sat Aug 21 00:39:05 2010
@@ -26,6 +26,8 @@ PIG-1249: Safe-guards against misconfigu
 
 IMPROVEMENTS
 
+PIG-1505: support jars and scripts in dfs (anhi via rding)
+
 PIG-1334: Make pig artifacts available through maven (niraj via rding)
 
 PIG-1466: Improve log messages for memory usage (thejas)

Modified: hadoop/pig/trunk/src/org/apache/pig/Main.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/Main.java?rev=987679&r1=987678&r2=987679&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/Main.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/Main.java Sat Aug 21 00:39:05 2010
@@ -17,11 +17,14 @@
  */
 package org.apache.pig;
 
+import java.io.BufferedInputStream;
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileReader;
 import java.io.FileWriter;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
@@ -48,6 +51,7 @@ import org.apache.commons.logging.LogFac
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PropertyConfigurator;
@@ -149,6 +153,7 @@ static int run(String args[], PigProgres
         opts.registerOpt('F', "stop_on_failure", 
CmdLineParser.ValueExpected.NOT_ACCEPTED);
         opts.registerOpt('M', "no_multiquery", 
CmdLineParser.ValueExpected.NOT_ACCEPTED);
         opts.registerOpt('P', "propertyFile", 
CmdLineParser.ValueExpected.REQUIRED);
+        opts.registerOpt('R', "prop", CmdLineParser.ValueExpected.REQUIRED);
 
         ExecMode mode = ExecMode.UNKNOWN;
         String file = null;
@@ -278,10 +283,35 @@ static int run(String args[], PigProgres
                         throw new RuntimeException("ERROR: Unrecognized 
exectype.", e);
                     }
                 break;
+                
             case 'P':
-                PropertiesUtil.loadPropertiesFromFile(properties,
-                        opts.getValStr());
+            {
+                InputStream inputStream = null;
+                try {
+                    FileLocalizer.FetchFileRet localFileRet = 
FileLocalizer.fetchFile(properties, opts.getValStr());
+                    inputStream = new BufferedInputStream(new 
FileInputStream(localFileRet.file));
+                    properties.load(inputStream) ;
+                } catch (IOException e) {
+                    throw new RuntimeException("Unable to parse properties 
file '" + opts.getValStr() + "'");
+                } finally {
+                    if (inputStream != null) {
+                        try {
+                            inputStream.close();
+                        } catch (IOException e) {
+                        } 
+                    }
+                }
+            }
+            break;
+            
+            case 'R':
+                int idx = opts.getValStr().indexOf('=');
+                if (idx == -1 || idx == 0) {
+                    throw new RuntimeException("Property '" + opts.getValStr() 
+ "' not in valid form A=B");
+                }
+                properties.put(opts.getValStr().substring(0, idx), 
opts.getValStr().substring(idx + 1));
                 break;
+              
             default: {
                 Character cc = Character.valueOf(opt);
                 throw new AssertionError("Unhandled option " + cc.toString());
@@ -327,12 +357,15 @@ static int run(String args[], PigProgres
         String substFile = null;
         switch (mode) {
         case FILE: {
-            // Run, using the provided file as a pig file
-            in = new BufferedReader(new FileReader(file));
+            FileLocalizer.FetchFileRet localFileRet = 
FileLocalizer.fetchFile(properties, file);
+            if (localFileRet.didFetch) {
+                properties.setProperty("pig.jars.relative.to.dfs", "true");
+            }
+            in = new BufferedReader(new FileReader(localFileRet.file));
 
             // run parameter substitution preprocessor first
             substFile = file + ".substituted";
-            pin = runParamPreprocessor(in, params, paramFiles, substFile, 
debug || dryrun || checkScriptOnly);
+            pin = runParamPreprocessor(properties, in, params, paramFiles, 
substFile, debug || dryrun || checkScriptOnly);
             if (dryrun) {
                 log.info("Dry run completed. Substituted pig script is at " + 
substFile);
                 return ReturnCode.SUCCESS;
@@ -427,11 +460,16 @@ static int run(String args[], PigProgres
                    throw new RuntimeException("Encountered unexpected 
arguments on command line - please check the command line.");
             }
             mode = ExecMode.FILE;
-            in = new BufferedReader(new FileReader(remainders[0]));
+            
+            FileLocalizer.FetchFileRet localFileRet = 
FileLocalizer.fetchFile(properties, remainders[0]);
+            if (localFileRet.didFetch) {
+                properties.setProperty("pig.jars.relative.to.dfs", "true");
+            }
+            in = new BufferedReader(new FileReader(localFileRet.file));
 
             // run parameter substitution preprocessor first
             substFile = remainders[0] + ".substituted";
-            pin = runParamPreprocessor(in, params, paramFiles, substFile, 
debug || dryrun || checkScriptOnly);
+            pin = runParamPreprocessor(properties, in, params, paramFiles, 
substFile, debug || dryrun || checkScriptOnly);
             if (dryrun){
                 log.info("Dry run completed. Substituted pig script is at " + 
substFile);
                 return ReturnCode.SUCCESS;
@@ -449,7 +487,7 @@ static int run(String args[], PigProgres
                                                    "PigLatin:" +new 
File(remainders[0]).getName()
             );
 
-            scriptState.setScript(new File(remainders[0]));
+            scriptState.setScript(localFileRet.file);
             
             grunt = new Grunt(pin, pigContext);
             gruntCalled = true;
@@ -572,9 +610,16 @@ private static void configureLog4J(Prope
 }
  
 // returns the stream of final pig script to be passed to Grunt
-private static BufferedReader runParamPreprocessor(BufferedReader 
origPigScript, ArrayList<String> params,
+private static BufferedReader runParamPreprocessor(Properties properties, 
BufferedReader origPigScript, ArrayList<String> params,
                                             ArrayList<String> paramFiles, 
String scriptFile, boolean createFile) 
                                 throws 
org.apache.pig.tools.parameters.ParseException, IOException{
+    
+    ArrayList<String> paramFiles2 = new ArrayList<String>();
+    for (String param: paramFiles) {
+        FileLocalizer.FetchFileRet localFileRet = 
FileLocalizer.fetchFile(properties, param);
+        paramFiles2.add(localFileRet.file.getAbsolutePath());
+    }    
+    
     ParameterSubstitutionPreprocessor psp = new 
ParameterSubstitutionPreprocessor(50);
     String[] type1 = new String[1];
     String[] type2 = new String[1];
@@ -582,13 +627,13 @@ private static BufferedReader runParamPr
     if (createFile){
         BufferedWriter fw = new BufferedWriter(new FileWriter(scriptFile));
         psp.genSubstitutedFile (origPigScript, fw, params.size() > 0 ? 
params.toArray(type1) : null, 
-                                paramFiles.size() > 0 ? 
paramFiles.toArray(type2) : null);
+                                paramFiles.size() > 0 ? 
paramFiles2.toArray(type2) : null);
         return new BufferedReader(new FileReader (scriptFile));
 
     } else {
         StringWriter writer = new StringWriter();
         psp.genSubstitutedFile (origPigScript, writer,  params.size() > 0 ? 
params.toArray(type1) : null, 
-                                paramFiles.size() > 0 ? 
paramFiles.toArray(type2) : null);
+                                paramFiles.size() > 0 ? 
paramFiles2.toArray(type2) : null);
         return new BufferedReader(new StringReader(writer.toString()));
     }
 }
@@ -648,6 +693,7 @@ public static void usage()
         System.out.println("    -F, -stop_on_failure - Aborts execution on the 
first failed job; default is off");
         System.out.println("    -M, -no_multiquery - Turn multiquery 
optimization off; default is on");
         System.out.println("    -P, -propertyFile - Path to property file");
+        System.out.println("    -R, -prop - Property key value pair of the 
form key=value");
 }
 
 public static void printProperties(){

Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=987679&r1=987678&r2=987679&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Sat Aug 21 00:39:05 2010
@@ -457,7 +457,7 @@ public class PigServer {
             URL resource = locateJarFromResources(name);
 
             if (resource == null) {
-                File f = new File(name);
+                File f = FileLocalizer.fetchFile(pigContext.getProperties(), 
name).file;
                 
                 if (!f.canRead()) {
                     int errCode = 4002;

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java?rev=987679&r1=987678&r2=987679&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Sat Aug 21 
00:39:05 2010
@@ -25,6 +25,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -38,12 +40,14 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.pig.ExecType;
+import org.apache.pig.PigException;
 import org.apache.pig.backend.datastorage.ContainerDescriptor;
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.datastorage.DataStorageException;
 import org.apache.pig.backend.datastorage.ElementDescriptor;
 import org.apache.pig.backend.datastorage.SeekableInputStream;
 import org.apache.pig.backend.datastorage.SeekableInputStream.FLAGS;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
 import org.apache.pig.backend.hadoop.datastorage.HPath;
@@ -695,4 +699,72 @@ public class FileLocalizer {
         }
         return line;
     }
+    
+    static File localTempDir = null;
+    static {
+        File f;
+        boolean success = true;
+        try {
+            f = File.createTempFile("pig", "tmp");
+            success &= f.delete();
+            success &= f.mkdir();
+            localTempDir = f;
+            localTempDir.deleteOnExit();
+        } catch (IOException e) {
+        }
+        if (!success) {
+          throw new RuntimeException("Error creating FileLocalizer temp 
directory.");
+        }
+    }    
+    
+    public static class FetchFileRet {
+        public FetchFileRet(File file, boolean didFetch) {
+            this.file = file;
+            this.didFetch = didFetch;
+        }
+        public File file;
+        public boolean didFetch;
+    }
+
+    /**
+     * Ensures that the passed path is on the local file system, fetching it 
+     * to the java.io.tmpdir if necessary. If pig.jars.relative.to.dfs is true
+     * and dfs is not null, then a relative path is assumed to be relative to 
the passed 
+     * dfs active directory. Else they are assumed to be relative to the local 
working
+     * directory.
+     */
+    public static FetchFileRet fetchFile(Properties properties, String 
filePath) throws IOException {
+        // Create URI from String.
+        URI fileUri = null;
+        try {
+            fileUri = new URI(filePath);
+        } catch (URISyntaxException e) {
+            throw new RuntimeException(e);
+        }
+        // If URI is a local file, verify it exists and return.
+        if 
(((!"true".equals(properties.getProperty("pig.jars.relative.to.dfs"))) && 
(fileUri.getScheme() == null))
+                || "file".equalsIgnoreCase(fileUri.getScheme())
+                || "local".equalsIgnoreCase(fileUri.getScheme())) {
+            File res = new File(fileUri.getPath());
+            if (!res.exists()) {
+                throw new ExecException("Local file '" + filePath + "' does 
not exist.", 101, PigException.INPUT);
+            }
+            return new FetchFileRet(res, false);
+        } else {
+            
+            Path src = new Path(fileUri.getPath());
+            File parent = (localTempDir != null) ? localTempDir : new 
File(System.getProperty("java.io.tmpdir")); 
+            File dest = new File(parent, src.getName());
+            dest.deleteOnExit();
+            try {
+                Configuration configuration = new Configuration();
+                ConfigurationUtil.mergeConf(configuration, 
ConfigurationUtil.toConfiguration(properties));
+                FileSystem srcFs = FileSystem.get(fileUri, configuration);
+                srcFs.copyToLocalFile(src, new Path(dest.getAbsolutePath()));
+            } catch (IOException e) {
+                throw new ExecException("Could not copy " + filePath + " to 
local destination " + dest, 101, PigException.INPUT, e);
+            }
+            return new FetchFileRet(dest, true);
+        }
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=987679&r1=987678&r2=987679&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Sat Aug 21 
00:39:05 2010
@@ -57,6 +57,8 @@ import org.apache.pig.backend.hadoop.dat
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
 import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileLocalizer.FetchFileRet;
 import org.apache.pig.impl.util.LogUtils;
 import org.apache.pig.impl.util.TupleFormat;
 import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
@@ -435,7 +437,8 @@ public class GruntParser extends PigScri
         boolean interactive;
          
         try {
-            String cmds = runPreprocessor(script, params, files);
+            FetchFileRet fetchFile = FileLocalizer.fetchFile(mConf, script);
+            String cmds = runPreprocessor(fetchFile.file.getAbsolutePath(), 
params, files);
 
             if (mInteractive && !batch) { // Write prompt and echo commands
                 // Console reader treats tabs in a special way

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=987679&r1=987678&r2=987679&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java Sat Aug 21 
00:39:05 2010
@@ -30,6 +30,7 @@ import java.util.Properties;
 
 import junit.framework.Assert;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigRunner;
 import org.apache.pig.PigRunner.ReturnCode;
@@ -102,6 +103,38 @@ public class TestPigRunner {
     }
     
     @Test
+    public void scriptsInDfsTest() throws Exception {
+        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+        w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, 
a2:int);");
+        w.println("B = group A by a0;");
+        w.println("C = foreach B generate group, COUNT(A);");
+        w.println("store C into '" + OUTPUT_FILE + "';");
+        w.close();
+        Util.copyFromLocalToCluster(cluster, PIG_FILE, PIG_FILE);
+        Path inputInDfs = new Path(cluster.getFileSystem().getHomeDirectory(), 
PIG_FILE);
+        
+        try {
+            String[] args = { inputInDfs.toString() };
+            PigStats stats = PigRunner.run(args, new 
TestNotificationListener());
+     
+            assertTrue(stats.isSuccessful());
+            
+            assertTrue(stats.getJobGraph().size() == 1);
+            String name = stats.getOutputNames().get(0);
+            assertEquals(OUTPUT_FILE, name);
+            assertEquals(12, stats.getBytesWritten());
+            assertEquals(3, stats.getRecordWritten());       
+            
+            assertEquals("A,B,C",
+                    
((JobStats)stats.getJobGraph().getSinks().get(0)).getAlias());
+        } finally {
+            new File(PIG_FILE).delete();
+            Util.deleteFile(cluster, PIG_FILE);
+            Util.deleteFile(cluster, OUTPUT_FILE);
+        }
+    }
+    
+    @Test
     public void orderByTest() throws Exception {
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
         w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, 
a2:int);");


Reply via email to