Author: pradeepkth
Date: Fri Nov  6 05:11:16 2009
New Revision: 833299

URL: http://svn.apache.org/viewvc?rev=833299&view=rev
Log:
Support comma separated file/directory names in load statements (rding via 
pradeepkth)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    hadoop/pig/trunk/test/org/apache/pig/test/TestLoad.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=833299&r1=833298&r2=833299&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Nov  6 05:11:16 2009
@@ -115,6 +115,9 @@
 
 BUG FIXES
 
+PIG-1071: Support comma separated file/directory names in load statements
+(rding via pradeepkth)
+
 PIG-970:  Changes to make HBase loader work with HBase 0.20 (vbarat and zjffdu
              via gates)
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=833299&r1=833298&r2=833299&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
(original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
Fri Nov  6 05:11:16 2009
@@ -173,13 +173,42 @@
        static int undollar(String s) {
                return Integer.parseInt(s.substring(1, s.length()));    
        }
-       
-
+           
     String massageFilename(String filename, PigContext pigContext, boolean 
isLoad) throws IOException, ParseException {
 
+        String fname;
+
+        // If we converted the file name before, we return the old
+        // result. This is not done for performance but to make sure
+        // we return the same result for relative paths when
+        // re-parsing the same script for execution.
+        // For example if a script has
+        // a = load ...
+        // ...
+        // cd x
+        // ...
+        // store c into 'foo'; => the abs. path for 'foo' would be 
'/user/<username>/x/foo'
+        // ..
+        // cd y
+        // ..
+        // store f into 'bar'=> the abs. path for 'bar' would be 
'/user/<username>/x/y/bar' 
+        // While re-parsing, the current working dir is already at 
/user/<username>/x/y
+        // so translating 'foo' to its absolute path based on that dir would 
give incorrect
+        // results - hence we store the translations into a map during the 
first parse for
+        // use during the reparse.
+        if (null != (fname = fileNameMap.get(filename))) {
+            return fname;
+        } else {
+            fname = filename;
+        }
+        
+        String[] fnames = StringUtils.getPathStrings(fname);
+        
+        ArrayList<String> pathStrings = new ArrayList<String>();
+        
         // If multiquery is off we revert to the old behavior, which
         // did not try to convert paths to their absolute location.
-       boolean isMultiQuery = 
"true".equalsIgnoreCase(pigContext.getProperties().getProperty("opt.multiquery","true"));
+        boolean isMultiQuery = 
"true".equalsIgnoreCase(pigContext.getProperties().getProperty("opt.multiquery","true"));
         if (!isMultiQuery) {
             if (!isLoad) { // stores do not require any change
                 return filename;
@@ -187,68 +216,81 @@
 
             // Local loads in the hadoop context require copying the
             // file to dfs first.
-            if (pigContext.getExecType() != ExecType.LOCAL 
-                && filename.startsWith(FileLocalizer.LOCAL_PREFIX)) {
-                    filename = FileLocalizer.hadoopify(filename, pigContext);
+            if (pigContext.getExecType() != ExecType.LOCAL) {
+                for (String strname : fnames) { 
+                    if (strname.startsWith(FileLocalizer.LOCAL_PREFIX)) {
+                        strname = FileLocalizer.hadoopify(strname, pigContext);
+                    }
+                    pathStrings.add(strname);
+                 }
             }
-            return filename;
+            return StringUtils.join(pathStrings, ",");
         }
 
-        String fname;
+        boolean hadoopify = false;
 
-        // If we converted the file name before, we return the old
-        // result. This is not done for performance but to make sure
-        // we return the same result for relative paths when
-        // re-parsing the same script for execution.
-        if (null != (fname = fileNameMap.get(filename))) {
-            return fname;
-        } else {
-            fname = filename;
-        }
+        for (String strname : fnames) {
 
-        String scheme, path;
+            boolean added = false;
+            String scheme, path;
 
-        if (fname.startsWith(FileLocalizer.LOCAL_PREFIX)) {
-            // We don't use hadoop path class to do the parsing,
-            // because our syntax of saying 'file:foo' for relative
-            // paths on the local FS is not a valid URL.
-            scheme = "file";
-            path = fname.substring(FileLocalizer.LOCAL_PREFIX.length());
-        } else {
-            // Path implements a custom uri parsing that takes care of
-            // unescaped characters (think globs). Using "new
-            // URI(fname)" would break.
-            URI uri = new Path(fname).toUri();
-            
-            scheme = uri.getScheme();
-            if (scheme != null) {
-                scheme = scheme.toLowerCase();
+            if (strname.startsWith(FileLocalizer.LOCAL_PREFIX)) {
+                // We don't use hadoop path class to do the parsing,
+                // because our syntax of saying 'file:foo' for relative
+                // paths on the local FS is not a valid URL.
+                scheme = "file";
+                path = strname.substring(FileLocalizer.LOCAL_PREFIX.length());
+            } else {
+                // Path implements a custom uri parsing that takes care of
+                // unescaped characters (think globs). Using "new
+                // URI(fname)" would break.
+                URI uri = new Path(strname).toUri();
+
+                scheme = uri.getScheme();
+                if (scheme != null) {
+                    scheme = scheme.toLowerCase();
+                }
+
+                path = uri.getPath();
             }
-            
-            path = uri.getPath();
-        }
-        
-        if (scheme == null || scheme.equals("file") || scheme.equals("hdfs")) {
-            if (pigContext.getExecType() != ExecType.LOCAL) {
-                if (fname.startsWith(FileLocalizer.LOCAL_PREFIX)) {
-                    if (isLoad) {
-                        fname = FileLocalizer.hadoopify(fname, pigContext);
+
+            if (scheme == null || scheme.equals("file") || 
scheme.equals("hdfs")) {
+                if (pigContext.getExecType() != ExecType.LOCAL) {
+                    if (strname.startsWith(FileLocalizer.LOCAL_PREFIX)) {
+                        if (isLoad) {
+                            strname = FileLocalizer.hadoopify(strname, 
pigContext);
+                            // in the old code before we were parsing comma 
separated paths
+                            // the code would return at this point without 
storing in the
+                            // fileNameMap. So we should remember to not 
update the fileNameMap
+                            // at the end of this method
+                            hadoopify = true;
+                        }
+                        pathStrings.add(strname);
+                        added = true;
                     }
-                    return fname;
+                }    
+                if (!added) {   
+                    DataStorage dfs = pigContext.getDfs();
+                    ContainerDescriptor desc = dfs.getActiveContainer();
+                    ElementDescriptor el = dfs.asElement(desc, path);
+                    strname = el.toString();
                 }
-            }       
-            DataStorage dfs = pigContext.getDfs();
-            ContainerDescriptor desc = dfs.getActiveContainer();
-            ElementDescriptor el = dfs.asElement(desc, path);
-            fname = el.toString();
+            }
+
+            if (!added) {
+                pathStrings.add(strname);
+            }
         }
 
-        if (!fname.equals(filename)) {
+        fname = StringUtils.join(pathStrings, ",");
+
+        if (!fname.equals(filename) && !hadoopify) {
             fileNameMap.put(filename, fname);
         }
+
         return fname;
     }
-       
+
        LogicalOperator parseCogroup(ArrayList<CogroupInput> gis, LogicalPlan 
lp, LOCogroup.GROUPTYPE type) throws ParseException, PlanException{
                
                log.trace("Entering parseCogroup");
@@ -776,6 +818,56 @@
             
             return output.toString() ;
         }
+        
+        public static String join(AbstractCollection<String> s, String 
delimiter) {
+            if (s.isEmpty()) return "";
+            Iterator<String> iter = s.iterator();
+            StringBuffer buffer = new StringBuffer(iter.next());
+            while (iter.hasNext()) {
+                buffer.append(delimiter);
+                buffer.append(iter.next());
+            }
+            return buffer.toString();
+        }
+        
+            
+        public static String[] getPathStrings(String commaSeparatedPaths) {
+            int length = commaSeparatedPaths.length();
+            int curlyOpen = 0;
+            int pathStart = 0;
+            boolean globPattern = false;
+            List<String> pathStrings = new ArrayList<String>();
+        
+            for (int i=0; i<length; i++) {
+                char ch = commaSeparatedPaths.charAt(i);
+                switch(ch) {
+                    case '{' : {
+                        curlyOpen++;
+                        if (!globPattern) {
+                            globPattern = true;
+                        }
+                        break;
+                    }
+                    case '}' : {
+                        curlyOpen--;
+                        if (curlyOpen == 0 && globPattern) {
+                            globPattern = false;
+                        }
+                        break;
+                    }
+                    case ',' : {
+                        if (!globPattern) {
+                            
pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
+                            pathStart = i + 1 ;
+                        }
+                        break;
+                    }
+                }
+            }
+            pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
+        
+            return pathStrings.toArray(new String[0]);
+        }
 }
 
 class FunctionType {

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLoad.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLoad.java?rev=833299&r1=833298&r2=833299&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLoad.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLoad.java Fri Nov  6 05:11:16 
2009
@@ -163,6 +163,36 @@
         checkLoadPath("t?s*","/tmp/t?s*");
     }
 
+    @Test
+    public void testCommaSeparatedString() throws Exception {
+        checkLoadPath("usr/pig/a,usr/pig/b","/tmp/usr/pig/a,/tmp/usr/pig/b");
+    }
+
+    @Test
+    public void testCommaSeparatedString2() throws Exception {
+        checkLoadPath("t?s*,test","/tmp/t?s*,/tmp/test");
+    }
+
+    @Test
+    public void testCommaSeparatedString3() throws Exception {
+        
checkLoadPath("hdfs:/tmp/test,hdfs:/tmp/test2,hdfs:/tmp/test3","/tmp/test,/tmp/test2,/tmp/test3");
+    }
+    
+    @Test
+    public void testCommaSeparatedString4() throws Exception {
+        
checkLoadPath("usr/pig/{a,c},usr/pig/b","/tmp/usr/pig/{a,c},/tmp/usr/pig/b");
+    }
+
+    @Test
+    public void testCommaSeparatedString5() throws Exception {
+        
checkLoadPath("/usr/pig/{a,c},usr/pig/b","/usr/pig/{a,c},/tmp/usr/pig/b");
+    }
+    
+    @Test
+    public void testCommaSeparatedString6() throws Exception {
+        
checkLoadPath("usr/pig/{a,c},/usr/pig/b","/tmp/usr/pig/{a,c},/usr/pig/b");
+    }    
+
     private void checkLoadPath(String orig, String expected) throws Exception {
         checkLoadPath(orig, expected, false);
     }


Reply via email to