Author: rding Date: Sat Aug 21 00:39:05 2010 New Revision: 987679 URL: http://svn.apache.org/viewvc?rev=987679&view=rev Log: PIG-1505: support jars and scripts in dfs
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/Main.java hadoop/pig/trunk/src/org/apache/pig/PigServer.java hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=987679&r1=987678&r2=987679&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Sat Aug 21 00:39:05 2010 @@ -26,6 +26,8 @@ PIG-1249: Safe-guards against misconfigu IMPROVEMENTS +PIG-1505: support jars and scripts in dfs (anhi via rding) + PIG-1334: Make pig artifacts available through maven (niraj via rding) PIG-1466: Improve log messages for memory usage (thejas) Modified: hadoop/pig/trunk/src/org/apache/pig/Main.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/Main.java?rev=987679&r1=987678&r2=987679&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/Main.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/Main.java Sat Aug 21 00:39:05 2010 @@ -17,11 +17,14 @@ */ package org.apache.pig; +import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; +import java.io.FileInputStream; import java.io.FileReader; import java.io.FileWriter; +import java.io.InputStream; import java.io.InputStreamReader; import java.io.IOException; import java.io.OutputStreamWriter; @@ -48,6 +51,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.fs.Path; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; @@ -149,6 +153,7 @@ static int run(String args[], PigProgres opts.registerOpt('F', "stop_on_failure", CmdLineParser.ValueExpected.NOT_ACCEPTED); opts.registerOpt('M', "no_multiquery", CmdLineParser.ValueExpected.NOT_ACCEPTED); opts.registerOpt('P', "propertyFile", CmdLineParser.ValueExpected.REQUIRED); + opts.registerOpt('R', "prop", CmdLineParser.ValueExpected.REQUIRED); ExecMode mode = ExecMode.UNKNOWN; String file = null; @@ -278,10 +283,35 @@ static int run(String args[], PigProgres throw new RuntimeException("ERROR: Unrecognized exectype.", e); } break; + case 'P': - PropertiesUtil.loadPropertiesFromFile(properties, - opts.getValStr()); + { + InputStream inputStream = null; + try { + FileLocalizer.FetchFileRet localFileRet = FileLocalizer.fetchFile(properties, opts.getValStr()); + inputStream = new BufferedInputStream(new FileInputStream(localFileRet.file)); + properties.load(inputStream) ; + } catch (IOException e) { + throw new RuntimeException("Unable to parse properties file '" + opts.getValStr() + "'"); + } finally { + if (inputStream != null) { + try { + inputStream.close(); + } catch (IOException e) { + } + } + } + } + break; + + case 'R': + int idx = opts.getValStr().indexOf('='); + if (idx == -1 || idx == 0) { + throw new RuntimeException("Property '" + opts.getValStr() + "' not in valid form A=B"); + } + properties.put(opts.getValStr().substring(0, idx), opts.getValStr().substring(idx + 1)); break; + default: { Character cc = Character.valueOf(opt); throw new AssertionError("Unhandled option " + cc.toString()); @@ -327,12 +357,15 @@ static int run(String args[], PigProgres String substFile = null; switch (mode) { case FILE: { - // Run, using the provided file as a pig file - in = new BufferedReader(new FileReader(file)); + FileLocalizer.FetchFileRet localFileRet = FileLocalizer.fetchFile(properties, file); + if (localFileRet.didFetch) { + properties.setProperty("pig.jars.relative.to.dfs", "true"); + } + in = new BufferedReader(new FileReader(localFileRet.file)); // run parameter substitution preprocessor first substFile = file + ".substituted"; - pin = runParamPreprocessor(in, params, paramFiles, substFile, debug || dryrun || checkScriptOnly); + pin = runParamPreprocessor(properties, in, params, paramFiles, substFile, debug || dryrun || checkScriptOnly); if (dryrun) { log.info("Dry run completed. Substituted pig script is at " + substFile); return ReturnCode.SUCCESS; @@ -427,11 +460,16 @@ static int run(String args[], PigProgres throw new RuntimeException("Encountered unexpected arguments on command line - please check the command line."); } mode = ExecMode.FILE; - in = new BufferedReader(new FileReader(remainders[0])); + + FileLocalizer.FetchFileRet localFileRet = FileLocalizer.fetchFile(properties, remainders[0]); + if (localFileRet.didFetch) { + properties.setProperty("pig.jars.relative.to.dfs", "true"); + } + in = new BufferedReader(new FileReader(localFileRet.file)); // run parameter substitution preprocessor first substFile = remainders[0] + ".substituted"; - pin = runParamPreprocessor(in, params, paramFiles, substFile, debug || dryrun || checkScriptOnly); + pin = runParamPreprocessor(properties, in, params, paramFiles, substFile, debug || dryrun || checkScriptOnly); if (dryrun){ log.info("Dry run completed. Substituted pig script is at " + substFile); return ReturnCode.SUCCESS; @@ -449,7 +487,7 @@ static int run(String args[], PigProgres "PigLatin:" +new File(remainders[0]).getName() ); - scriptState.setScript(new File(remainders[0])); + scriptState.setScript(localFileRet.file); grunt = new Grunt(pin, pigContext); gruntCalled = true; @@ -572,9 +610,16 @@ private static void configureLog4J(Prope } // returns the stream of final pig script to be passed to Grunt -private static BufferedReader runParamPreprocessor(BufferedReader origPigScript, ArrayList<String> params, +private static BufferedReader runParamPreprocessor(Properties properties, BufferedReader origPigScript, ArrayList<String> params, ArrayList<String> paramFiles, String scriptFile, boolean createFile) throws org.apache.pig.tools.parameters.ParseException, IOException{ + + ArrayList<String> paramFiles2 = new ArrayList<String>(); + for (String param: paramFiles) { + FileLocalizer.FetchFileRet localFileRet = FileLocalizer.fetchFile(properties, param); + paramFiles2.add(localFileRet.file.getAbsolutePath()); + } + ParameterSubstitutionPreprocessor psp = new ParameterSubstitutionPreprocessor(50); String[] type1 = new String[1]; String[] type2 = new String[1]; @@ -582,13 +627,13 @@ private static BufferedReader runParamPr if (createFile){ BufferedWriter fw = new BufferedWriter(new FileWriter(scriptFile)); psp.genSubstitutedFile (origPigScript, fw, params.size() > 0 ? params.toArray(type1) : null, - paramFiles.size() > 0 ? paramFiles.toArray(type2) : null); + paramFiles.size() > 0 ? paramFiles2.toArray(type2) : null); return new BufferedReader(new FileReader (scriptFile)); } else { StringWriter writer = new StringWriter(); psp.genSubstitutedFile (origPigScript, writer, params.size() > 0 ? params.toArray(type1) : null, - paramFiles.size() > 0 ? paramFiles.toArray(type2) : null); + paramFiles.size() > 0 ? paramFiles2.toArray(type2) : null); return new BufferedReader(new StringReader(writer.toString())); } } @@ -648,6 +693,7 @@ public static void usage() System.out.println(" -F, -stop_on_failure - Aborts execution on the first failed job; default is off"); System.out.println(" -M, -no_multiquery - Turn multiquery optimization off; default is on"); System.out.println(" -P, -propertyFile - Path to property file"); + System.out.println(" -R, -prop - Property key value pair of the form key=value"); } public static void printProperties(){ Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=987679&r1=987678&r2=987679&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Sat Aug 21 00:39:05 2010 @@ -457,7 +457,7 @@ public class PigServer { URL resource = locateJarFromResources(name); if (resource == null) { - File f = new File(name); + File f = FileLocalizer.fetchFile(pigContext.getProperties(), name).file; if (!f.canRead()) { int errCode = 4002; Modified: hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java?rev=987679&r1=987678&r2=987679&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Sat Aug 21 00:39:05 2010 @@ -25,6 +25,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -38,12 +40,14 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.pig.ExecType; +import org.apache.pig.PigException; import org.apache.pig.backend.datastorage.ContainerDescriptor; import org.apache.pig.backend.datastorage.DataStorage; import org.apache.pig.backend.datastorage.DataStorageException; import org.apache.pig.backend.datastorage.ElementDescriptor; import org.apache.pig.backend.datastorage.SeekableInputStream; import org.apache.pig.backend.datastorage.SeekableInputStream.FLAGS; +import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.datastorage.HDataStorage; import org.apache.pig.backend.hadoop.datastorage.HPath; @@ -695,4 +699,72 @@ public class FileLocalizer { } return line; } + + static File localTempDir = null; + static { + File f; + boolean success = true; + try { + f = File.createTempFile("pig", "tmp"); + success &= f.delete(); + success &= f.mkdir(); + localTempDir = f; + localTempDir.deleteOnExit(); + } catch (IOException e) { + } + if (!success) { + throw new RuntimeException("Error creating FileLocalizer temp directory."); + } + } + + public static class FetchFileRet { + public FetchFileRet(File file, boolean didFetch) { + this.file = file; + this.didFetch = didFetch; + } + public File file; + public boolean didFetch; + } + + /** + * Ensures that the passed path is on the local file system, fetching it + * to the java.io.tmpdir if necessary. If pig.jars.relative.to.dfs is true + * and dfs is not null, then a relative path is assumed to be relative to the passed + * dfs active directory. Else they are assumed to be relative to the local working + * directory. + */ + public static FetchFileRet fetchFile(Properties properties, String filePath) throws IOException { + // Create URI from String. + URI fileUri = null; + try { + fileUri = new URI(filePath); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + // If URI is a local file, verify it exists and return. + if (((!"true".equals(properties.getProperty("pig.jars.relative.to.dfs"))) && (fileUri.getScheme() == null)) + || "file".equalsIgnoreCase(fileUri.getScheme()) + || "local".equalsIgnoreCase(fileUri.getScheme())) { + File res = new File(fileUri.getPath()); + if (!res.exists()) { + throw new ExecException("Local file '" + filePath + "' does not exist.", 101, PigException.INPUT); + } + return new FetchFileRet(res, false); + } else { + + Path src = new Path(fileUri.getPath()); + File parent = (localTempDir != null) ? localTempDir : new File(System.getProperty("java.io.tmpdir")); + File dest = new File(parent, src.getName()); + dest.deleteOnExit(); + try { + Configuration configuration = new Configuration(); + ConfigurationUtil.mergeConf(configuration, ConfigurationUtil.toConfiguration(properties)); + FileSystem srcFs = FileSystem.get(fileUri, configuration); + srcFs.copyToLocalFile(src, new Path(dest.getAbsolutePath())); + } catch (IOException e) { + throw new ExecException("Could not copy " + filePath + " to local destination " + dest, 101, PigException.INPUT, e); + } + return new FetchFileRet(dest, true); + } + } } Modified: hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=987679&r1=987678&r2=987679&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Sat Aug 21 00:39:05 2010 @@ -57,6 +57,8 @@ import org.apache.pig.backend.hadoop.dat import org.apache.pig.backend.hadoop.datastorage.HDataStorage; import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine; import org.apache.pig.data.Tuple; +import org.apache.pig.impl.io.FileLocalizer; +import org.apache.pig.impl.io.FileLocalizer.FetchFileRet; import org.apache.pig.impl.util.LogUtils; import org.apache.pig.impl.util.TupleFormat; import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor; @@ -435,7 +437,8 @@ public class GruntParser extends PigScri boolean interactive; try { - String cmds = runPreprocessor(script, params, files); + FetchFileRet fetchFile = FileLocalizer.fetchFile(mConf, script); + String cmds = runPreprocessor(fetchFile.file.getAbsolutePath(), params, files); if (mInteractive && !batch) { // Write prompt and echo commands // Console reader treats tabs in a special way Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=987679&r1=987678&r2=987679&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java Sat Aug 21 00:39:05 2010 @@ -30,6 +30,7 @@ import java.util.Properties; import junit.framework.Assert; +import org.apache.hadoop.fs.Path; import org.apache.pig.ExecType; import org.apache.pig.PigRunner; import org.apache.pig.PigRunner.ReturnCode; @@ -102,6 +103,38 @@ public class TestPigRunner { } @Test + public void scriptsInDfsTest() throws Exception { + PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE)); + w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);"); + w.println("B = group A by a0;"); + w.println("C = foreach B generate group, COUNT(A);"); + w.println("store C into '" + OUTPUT_FILE + "';"); + w.close(); + Util.copyFromLocalToCluster(cluster, PIG_FILE, PIG_FILE); + Path inputInDfs = new Path(cluster.getFileSystem().getHomeDirectory(), PIG_FILE); + + try { + String[] args = { inputInDfs.toString() }; + PigStats stats = PigRunner.run(args, new TestNotificationListener()); + + assertTrue(stats.isSuccessful()); + + assertTrue(stats.getJobGraph().size() == 1); + String name = stats.getOutputNames().get(0); + assertEquals(OUTPUT_FILE, name); + assertEquals(12, stats.getBytesWritten()); + assertEquals(3, stats.getRecordWritten()); + + assertEquals("A,B,C", + ((JobStats)stats.getJobGraph().getSinks().get(0)).getAlias()); + } finally { + new File(PIG_FILE).delete(); + Util.deleteFile(cluster, PIG_FILE); + Util.deleteFile(cluster, OUTPUT_FILE); + } + } + + @Test public void orderByTest() throws Exception { PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE)); w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");