Author: pradeepkth
Date: Fri Nov 6 05:11:16 2009
New Revision: 833299
URL: http://svn.apache.org/viewvc?rev=833299&view=rev
Log:
Support comma separated file/directory names in load statements (rding via
pradeepkth)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
hadoop/pig/trunk/test/org/apache/pig/test/TestLoad.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=833299&r1=833298&r2=833299&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Nov 6 05:11:16 2009
@@ -115,6 +115,9 @@
BUG FIXES
+PIG-1071: Support comma separated file/directory names in load statements
+(rding via pradeepkth)
+
PIG-970: Changes to make HBase loader work with HBase 0.20 (vbarat and zjffdu
via gates)
Modified:
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=833299&r1=833298&r2=833299&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
Fri Nov 6 05:11:16 2009
@@ -173,13 +173,42 @@
static int undollar(String s) {
return Integer.parseInt(s.substring(1, s.length()));
}
-
-
+
String massageFilename(String filename, PigContext pigContext, boolean
isLoad) throws IOException, ParseException {
+ String fname;
+
+ // If we converted the file name before, we return the old
+ // result. This is not done for performance but to make sure
+ // we return the same result for relative paths when
+ // re-parsing the same script for execution.
+ // For example if a script has
+ // a = load ...
+ // ...
+ // cd x
+ // ...
+ // store c into 'foo'; => the abs. path for 'foo' would be
'/user/<username>/x/foo'
+ // ..
+ // cd y
+ // ..
+ // store f into 'bar'=> the abs. path for 'bar' would be
'/user/<username>/x/y/bar'
+ // While re-parsing, the current working dir is already at
/user/<username>/x/y
+ // so translating 'foo' to its absolute path based on that dir would
give incorrect
+ // results - hence we store the translations into a map during the
first parse for
+ // use during the reparse.
+ if (null != (fname = fileNameMap.get(filename))) {
+ return fname;
+ } else {
+ fname = filename;
+ }
+
+ String[] fnames = StringUtils.getPathStrings(fname);
+
+ ArrayList<String> pathStrings = new ArrayList<String>();
+
// If multiquery is off we revert to the old behavior, which
// did not try to convert paths to their absolute location.
- boolean isMultiQuery =
"true".equalsIgnoreCase(pigContext.getProperties().getProperty("opt.multiquery","true"));
+ boolean isMultiQuery =
"true".equalsIgnoreCase(pigContext.getProperties().getProperty("opt.multiquery","true"));
if (!isMultiQuery) {
if (!isLoad) { // stores do not require any change
return filename;
@@ -187,68 +216,81 @@
// Local loads in the hadoop context require copying the
// file to dfs first.
- if (pigContext.getExecType() != ExecType.LOCAL
- && filename.startsWith(FileLocalizer.LOCAL_PREFIX)) {
- filename = FileLocalizer.hadoopify(filename, pigContext);
+ if (pigContext.getExecType() != ExecType.LOCAL) {
+ for (String strname : fnames) {
+ if (strname.startsWith(FileLocalizer.LOCAL_PREFIX)) {
+ strname = FileLocalizer.hadoopify(strname, pigContext);
+ }
+ pathStrings.add(strname);
+ }
}
- return filename;
+ return StringUtils.join(pathStrings, ",");
}
- String fname;
+ boolean hadoopify = false;
- // If we converted the file name before, we return the old
- // result. This is not done for performance but to make sure
- // we return the same result for relative paths when
- // re-parsing the same script for execution.
- if (null != (fname = fileNameMap.get(filename))) {
- return fname;
- } else {
- fname = filename;
- }
+ for (String strname : fnames) {
- String scheme, path;
+ boolean added = false;
+ String scheme, path;
- if (fname.startsWith(FileLocalizer.LOCAL_PREFIX)) {
- // We don't use hadoop path class to do the parsing,
- // because our syntax of saying 'file:foo' for relative
- // paths on the local FS is not a valid URL.
- scheme = "file";
- path = fname.substring(FileLocalizer.LOCAL_PREFIX.length());
- } else {
- // Path implements a custom uri parsing that takes care of
- // unescaped characters (think globs). Using "new
- // URI(fname)" would break.
- URI uri = new Path(fname).toUri();
-
- scheme = uri.getScheme();
- if (scheme != null) {
- scheme = scheme.toLowerCase();
+ if (strname.startsWith(FileLocalizer.LOCAL_PREFIX)) {
+ // We don't use hadoop path class to do the parsing,
+ // because our syntax of saying 'file:foo' for relative
+ // paths on the local FS is not a valid URL.
+ scheme = "file";
+ path = strname.substring(FileLocalizer.LOCAL_PREFIX.length());
+ } else {
+ // Path implements a custom uri parsing that takes care of
+ // unescaped characters (think globs). Using "new
+ // URI(fname)" would break.
+ URI uri = new Path(strname).toUri();
+
+ scheme = uri.getScheme();
+ if (scheme != null) {
+ scheme = scheme.toLowerCase();
+ }
+
+ path = uri.getPath();
}
-
- path = uri.getPath();
- }
-
- if (scheme == null || scheme.equals("file") || scheme.equals("hdfs")) {
- if (pigContext.getExecType() != ExecType.LOCAL) {
- if (fname.startsWith(FileLocalizer.LOCAL_PREFIX)) {
- if (isLoad) {
- fname = FileLocalizer.hadoopify(fname, pigContext);
+
+ if (scheme == null || scheme.equals("file") ||
scheme.equals("hdfs")) {
+ if (pigContext.getExecType() != ExecType.LOCAL) {
+ if (strname.startsWith(FileLocalizer.LOCAL_PREFIX)) {
+ if (isLoad) {
+ strname = FileLocalizer.hadoopify(strname,
pigContext);
+ // in the old code before we were parsing comma
separated paths
+ // the code would return at this point without
storing in the
+ // fileNameMap. So we should remember to not
update the fileNameMap
+ // at the end of this method
+ hadoopify = true;
+ }
+ pathStrings.add(strname);
+ added = true;
}
- return fname;
+ }
+ if (!added) {
+ DataStorage dfs = pigContext.getDfs();
+ ContainerDescriptor desc = dfs.getActiveContainer();
+ ElementDescriptor el = dfs.asElement(desc, path);
+ strname = el.toString();
}
- }
- DataStorage dfs = pigContext.getDfs();
- ContainerDescriptor desc = dfs.getActiveContainer();
- ElementDescriptor el = dfs.asElement(desc, path);
- fname = el.toString();
+ }
+
+ if (!added) {
+ pathStrings.add(strname);
+ }
}
- if (!fname.equals(filename)) {
+ fname = StringUtils.join(pathStrings, ",");
+
+ if (!fname.equals(filename) && !hadoopify) {
fileNameMap.put(filename, fname);
}
+
return fname;
}
-
+
LogicalOperator parseCogroup(ArrayList<CogroupInput> gis, LogicalPlan
lp, LOCogroup.GROUPTYPE type) throws ParseException, PlanException{
log.trace("Entering parseCogroup");
@@ -776,6 +818,56 @@
return output.toString() ;
}
+
+ public static String join(AbstractCollection<String> s, String
delimiter) {
+ if (s.isEmpty()) return "";
+ Iterator<String> iter = s.iterator();
+ StringBuffer buffer = new StringBuffer(iter.next());
+ while (iter.hasNext()) {
+ buffer.append(delimiter);
+ buffer.append(iter.next());
+ }
+ return buffer.toString();
+ }
+
+
+ public static String[] getPathStrings(String commaSeparatedPaths) {
+ int length = commaSeparatedPaths.length();
+ int curlyOpen = 0;
+ int pathStart = 0;
+ boolean globPattern = false;
+ List<String> pathStrings = new ArrayList<String>();
+
+ for (int i=0; i<length; i++) {
+ char ch = commaSeparatedPaths.charAt(i);
+ switch(ch) {
+ case '{' : {
+ curlyOpen++;
+ if (!globPattern) {
+ globPattern = true;
+ }
+ break;
+ }
+ case '}' : {
+ curlyOpen--;
+ if (curlyOpen == 0 && globPattern) {
+ globPattern = false;
+ }
+ break;
+ }
+ case ',' : {
+ if (!globPattern) {
+
pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
+ pathStart = i + 1 ;
+ }
+ break;
+ }
+ }
+ }
+ pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
+
+ return pathStrings.toArray(new String[0]);
+ }
}
class FunctionType {
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLoad.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLoad.java?rev=833299&r1=833298&r2=833299&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLoad.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLoad.java Fri Nov 6 05:11:16
2009
@@ -163,6 +163,36 @@
checkLoadPath("t?s*","/tmp/t?s*");
}
+ @Test
+ public void testCommaSeparatedString() throws Exception {
+ checkLoadPath("usr/pig/a,usr/pig/b","/tmp/usr/pig/a,/tmp/usr/pig/b");
+ }
+
+ @Test
+ public void testCommaSeparatedString2() throws Exception {
+ checkLoadPath("t?s*,test","/tmp/t?s*,/tmp/test");
+ }
+
+ @Test
+ public void testCommaSeparatedString3() throws Exception {
+
checkLoadPath("hdfs:/tmp/test,hdfs:/tmp/test2,hdfs:/tmp/test3","/tmp/test,/tmp/test2,/tmp/test3");
+ }
+
+ @Test
+ public void testCommaSeparatedString4() throws Exception {
+
checkLoadPath("usr/pig/{a,c},usr/pig/b","/tmp/usr/pig/{a,c},/tmp/usr/pig/b");
+ }
+
+ @Test
+ public void testCommaSeparatedString5() throws Exception {
+
checkLoadPath("/usr/pig/{a,c},usr/pig/b","/usr/pig/{a,c},/tmp/usr/pig/b");
+ }
+
+ @Test
+ public void testCommaSeparatedString6() throws Exception {
+
checkLoadPath("usr/pig/{a,c},/usr/pig/b","/tmp/usr/pig/{a,c},/usr/pig/b");
+ }
+
private void checkLoadPath(String orig, String expected) throws Exception {
checkLoadPath(orig, expected, false);
}