Author: pradeepkth Date: Fri Nov 6 05:11:16 2009 New Revision: 833299 URL: http://svn.apache.org/viewvc?rev=833299&view=rev Log: Support comma separated file/directory names in load statements (rding via pradeepkth)
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt hadoop/pig/trunk/test/org/apache/pig/test/TestLoad.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=833299&r1=833298&r2=833299&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Fri Nov 6 05:11:16 2009 @@ -115,6 +115,9 @@ BUG FIXES +PIG-1071: Support comma separated file/directory names in load statements +(rding via pradeepkth) + PIG-970: Changes to make HBase loader work with HBase 0.20 (vbarat and zjffdu via gates) Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=833299&r1=833298&r2=833299&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Fri Nov 6 05:11:16 2009 @@ -173,13 +173,42 @@ static int undollar(String s) { return Integer.parseInt(s.substring(1, s.length())); } - - + String massageFilename(String filename, PigContext pigContext, boolean isLoad) throws IOException, ParseException { + String fname; + + // If we converted the file name before, we return the old + // result. This is not done for performance but to make sure + // we return the same result for relative paths when + // re-parsing the same script for execution. + // For example if a script has + // a = load ... + // ... + // cd x + // ... + // store c into 'foo'; => the abs. path for 'foo' would be '/user/<username>/x/foo' + // .. + // cd y + // .. + // store f into 'bar'=> the abs. path for 'bar' would be '/user/<username>/x/y/bar' + // While re-parsing, the current working dir is already at /user/<username>/x/y + // so translating 'foo' to its absolute path based on that dir would give incorrect + // results - hence we store the translations into a map during the first parse for + // use during the reparse. + if (null != (fname = fileNameMap.get(filename))) { + return fname; + } else { + fname = filename; + } + + String[] fnames = StringUtils.getPathStrings(fname); + + ArrayList<String> pathStrings = new ArrayList<String>(); + // If multiquery is off we revert to the old behavior, which // did not try to convert paths to their absolute location. - boolean isMultiQuery = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("opt.multiquery","true")); + boolean isMultiQuery = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("opt.multiquery","true")); if (!isMultiQuery) { if (!isLoad) { // stores do not require any change return filename; @@ -187,68 +216,81 @@ // Local loads in the hadoop context require copying the // file to dfs first. - if (pigContext.getExecType() != ExecType.LOCAL - && filename.startsWith(FileLocalizer.LOCAL_PREFIX)) { - filename = FileLocalizer.hadoopify(filename, pigContext); + if (pigContext.getExecType() != ExecType.LOCAL) { + for (String strname : fnames) { + if (strname.startsWith(FileLocalizer.LOCAL_PREFIX)) { + strname = FileLocalizer.hadoopify(strname, pigContext); + } + pathStrings.add(strname); + } } - return filename; + return StringUtils.join(pathStrings, ","); } - String fname; + boolean hadoopify = false; - // If we converted the file name before, we return the old - // result. This is not done for performance but to make sure - // we return the same result for relative paths when - // re-parsing the same script for execution. - if (null != (fname = fileNameMap.get(filename))) { - return fname; - } else { - fname = filename; - } + for (String strname : fnames) { - String scheme, path; + boolean added = false; + String scheme, path; - if (fname.startsWith(FileLocalizer.LOCAL_PREFIX)) { - // We don't use hadoop path class to do the parsing, - // because our syntax of saying 'file:foo' for relative - // paths on the local FS is not a valid URL. - scheme = "file"; - path = fname.substring(FileLocalizer.LOCAL_PREFIX.length()); - } else { - // Path implements a custom uri parsing that takes care of - // unescaped characters (think globs). Using "new - // URI(fname)" would break. - URI uri = new Path(fname).toUri(); - - scheme = uri.getScheme(); - if (scheme != null) { - scheme = scheme.toLowerCase(); + if (strname.startsWith(FileLocalizer.LOCAL_PREFIX)) { + // We don't use hadoop path class to do the parsing, + // because our syntax of saying 'file:foo' for relative + // paths on the local FS is not a valid URL. + scheme = "file"; + path = strname.substring(FileLocalizer.LOCAL_PREFIX.length()); + } else { + // Path implements a custom uri parsing that takes care of + // unescaped characters (think globs). Using "new + // URI(fname)" would break. + URI uri = new Path(strname).toUri(); + + scheme = uri.getScheme(); + if (scheme != null) { + scheme = scheme.toLowerCase(); + } + + path = uri.getPath(); } - - path = uri.getPath(); - } - - if (scheme == null || scheme.equals("file") || scheme.equals("hdfs")) { - if (pigContext.getExecType() != ExecType.LOCAL) { - if (fname.startsWith(FileLocalizer.LOCAL_PREFIX)) { - if (isLoad) { - fname = FileLocalizer.hadoopify(fname, pigContext); + + if (scheme == null || scheme.equals("file") || scheme.equals("hdfs")) { + if (pigContext.getExecType() != ExecType.LOCAL) { + if (strname.startsWith(FileLocalizer.LOCAL_PREFIX)) { + if (isLoad) { + strname = FileLocalizer.hadoopify(strname, pigContext); + // in the old code before we were parsing comma separated paths + // the code would return at this point without storing in the + // fileNameMap. So we should remember to not update the fileNameMap + // at the end of this method + hadoopify = true; + } + pathStrings.add(strname); + added = true; } - return fname; + } + if (!added) { + DataStorage dfs = pigContext.getDfs(); + ContainerDescriptor desc = dfs.getActiveContainer(); + ElementDescriptor el = dfs.asElement(desc, path); + strname = el.toString(); } - } - DataStorage dfs = pigContext.getDfs(); - ContainerDescriptor desc = dfs.getActiveContainer(); - ElementDescriptor el = dfs.asElement(desc, path); - fname = el.toString(); + } + + if (!added) { + pathStrings.add(strname); + } } - if (!fname.equals(filename)) { + fname = StringUtils.join(pathStrings, ","); + + if (!fname.equals(filename) && !hadoopify) { fileNameMap.put(filename, fname); } + return fname; } - + LogicalOperator parseCogroup(ArrayList<CogroupInput> gis, LogicalPlan lp, LOCogroup.GROUPTYPE type) throws ParseException, PlanException{ log.trace("Entering parseCogroup"); @@ -776,6 +818,56 @@ return output.toString() ; } + + public static String join(AbstractCollection<String> s, String delimiter) { + if (s.isEmpty()) return ""; + Iterator<String> iter = s.iterator(); + StringBuffer buffer = new StringBuffer(iter.next()); + while (iter.hasNext()) { + buffer.append(delimiter); + buffer.append(iter.next()); + } + return buffer.toString(); + } + + + public static String[] getPathStrings(String commaSeparatedPaths) { + int length = commaSeparatedPaths.length(); + int curlyOpen = 0; + int pathStart = 0; + boolean globPattern = false; + List<String> pathStrings = new ArrayList<String>(); + + for (int i=0; i<length; i++) { + char ch = commaSeparatedPaths.charAt(i); + switch(ch) { + case '{' : { + curlyOpen++; + if (!globPattern) { + globPattern = true; + } + break; + } + case '}' : { + curlyOpen--; + if (curlyOpen == 0 && globPattern) { + globPattern = false; + } + break; + } + case ',' : { + if (!globPattern) { + pathStrings.add(commaSeparatedPaths.substring(pathStart, i)); + pathStart = i + 1 ; + } + break; + } + } + } + pathStrings.add(commaSeparatedPaths.substring(pathStart, length)); + + return pathStrings.toArray(new String[0]); + } } class FunctionType { Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLoad.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLoad.java?rev=833299&r1=833298&r2=833299&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestLoad.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestLoad.java Fri Nov 6 05:11:16 2009 @@ -163,6 +163,36 @@ checkLoadPath("t?s*","/tmp/t?s*"); } + @Test + public void testCommaSeparatedString() throws Exception { + checkLoadPath("usr/pig/a,usr/pig/b","/tmp/usr/pig/a,/tmp/usr/pig/b"); + } + + @Test + public void testCommaSeparatedString2() throws Exception { + checkLoadPath("t?s*,test","/tmp/t?s*,/tmp/test"); + } + + @Test + public void testCommaSeparatedString3() throws Exception { + checkLoadPath("hdfs:/tmp/test,hdfs:/tmp/test2,hdfs:/tmp/test3","/tmp/test,/tmp/test2,/tmp/test3"); + } + + @Test + public void testCommaSeparatedString4() throws Exception { + checkLoadPath("usr/pig/{a,c},usr/pig/b","/tmp/usr/pig/{a,c},/tmp/usr/pig/b"); + } + + @Test + public void testCommaSeparatedString5() throws Exception { + checkLoadPath("/usr/pig/{a,c},usr/pig/b","/usr/pig/{a,c},/tmp/usr/pig/b"); + } + + @Test + public void testCommaSeparatedString6() throws Exception { + checkLoadPath("usr/pig/{a,c},/usr/pig/b","/tmp/usr/pig/{a,c},/usr/pig/b"); + } + private void checkLoadPath(String orig, String expected) throws Exception { checkLoadPath(orig, expected, false); }