Author: olga
Date: Thu May  8 19:40:10 2008
New Revision: 654669

URL: http://svn.apache.org/viewvc?rev=654669&view=rev
Log:
PIG-232: let valid cache statements in

Modified:
    incubator/pig/trunk/CHANGES.txt
    incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
    incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java
    incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java

Modified: incubator/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=654669&r1=654668&r2=654669&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Thu May  8 19:40:10 2008
@@ -289,3 +289,5 @@
 
     PIG-232: fix for number of input records when BinaryStirage is used
     (acmurthy via olgan)
+
+    PIG-232: let valid cache specifications through (acmurthy via olgan)

Modified: incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java?rev=654669&r1=654668&r2=654669&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Thu May  
8 19:40:10 2008
@@ -344,6 +344,27 @@
         return elem.exists() || globMatchesFiles(elem, store);
     }
 
+    public static boolean isFile(String filename, PigContext context)
+    throws IOException {
+        return !isDirectory(filename, context.getDfs());
+    }
+
+    public static boolean isFile(String filename, DataStorage store)
+    throws IOException {
+        return !isDirectory(filename, store);
+    }
+
+    public static boolean isDirectory(String filename, PigContext context)
+    throws IOException {
+        return isDirectory(filename, context.getDfs());
+    }
+
+    public static boolean isDirectory(String filename, DataStorage store)
+    throws IOException {
+        ElementDescriptor elem = store.asElement(filename);
+        return (elem instanceof ContainerDescriptor);
+    }
+
     private static boolean globMatchesFiles(ElementDescriptor elem,
                                             DataStorage fs)
             throws IOException

Modified: 
incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java?rev=654669&r1=654668&r2=654669&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java 
(original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java 
Thu May  8 19:40:10 2008
@@ -3,6 +3,8 @@
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -142,7 +144,11 @@
         // Validate
         File file = new File(path);
         if (!file.exists()) {
-            throw new IOException("Invalid ship specification: " + path);
+            throw new IOException("Invalid ship specification: '" + path + 
+                                  "' does not exist!");
+        } else if (file.isDirectory()) {
+            throw new IOException("Invalid ship specification: '" + path + 
+                                  "' is a directory and can't be shipped!");
         }
         
         shipSpec.add(path);
@@ -156,10 +162,35 @@
      */
     public void addPathToCache(String path) throws IOException {
         // Validate
-        if (!FileLocalizer.fileExists(path, pigContext)) {
+        URI pathUri = null;
+        URI dfsPath = null;
+        try {
+            pathUri = new URI(path);
+            
+            // Strip away the URI's _fragment_ and _query_
+            dfsPath = new URI(pathUri.getScheme(), pathUri.getAuthority(), 
+                              pathUri.getPath(), null, null);
+        } catch (URISyntaxException urise) {
             throw new IOException("Invalid cache specification: " + path);
         }
         
+        boolean exists = false;
+        try {
+            exists = FileLocalizer.fileExists(dfsPath.toString(), pigContext);
+        } catch (IOException ioe) {
+            // Throw a better error message...
+            throw new IOException("Invalid cache specification: '" + dfsPath + 
+                                  "' does not exist!");
+        } 
+        
+        if (!exists) {
+            throw new IOException("Invalid cache specification: '" + dfsPath + 
+                                  "' does not exist!");
+        } else if (FileLocalizer.isDirectory(dfsPath.toString(), pigContext)) {
+            throw new IOException("Invalid cache specification: '" + dfsPath + 
+                                  "' is a directory and can't be cached!");
+        }
+
         cacheSpec.add(path);
     }
 

Modified: incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java?rev=654669&r1=654668&r2=654669&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java Thu May  8 
19:40:10 2008
@@ -248,6 +248,79 @@
     }
 
     @Test
+    public void testInputCacheSpecs() throws Exception {
+        // Can't run this without HDFS
+        if(execType == ExecType.LOCAL)
+            return;
+        
+        File input = Util.createInputFile("tmp", "", 
+                                          new String[] {"A,1", "B,2", "C,3", 
+                                                        "D,2", "A,5", "B,5", 
+                                                        "C,8", "A,8", "D,8", 
+                                                        "A,9"});
+
+        // Perl script 
+        String[] script = 
+            new String[] {
+                          "#!/usr/bin/perl",
+                          "open(INFILE,  $ARGV[0]) or die \"Can't open 
\".$ARGV[0].\"!: $!\";",
+                          "while (<INFILE>) {",
+                          "  chomp $_;",
+                          "  print STDOUT \"$_\n\";",
+                          "  print STDERR \"STDERR: $_\n\";",
+                          "}",
+                         };
+        // Copy the scripts to HDFS
+        File command1 = Util.createInputFile("script", "pl", script);
+        File command2 = Util.createInputFile("script", "pl", script);
+        String c1 = FileLocalizer.hadoopify(command1.toString(), 
+                                            pigServer.getPigContext());
+        String c2 = FileLocalizer.hadoopify(command2.toString(), 
+                                            pigServer.getPigContext());
+        
+        // Expected results
+        String[] expectedFirstFields = 
+            new String[] {"A", "B", "C", "A", "D", "A"};
+        int[] expectedSecondFields = new int[] {5, 5, 8, 8, 8, 9};
+        Tuple[] expectedResults = 
+            setupExpectedResults(expectedFirstFields, expectedSecondFields);
+
+        // Pig query to run
+        pigServer.registerQuery(
+                "define CMD1 `script1.pl foo` " +
+                "cache ('" + c1 + "#script1.pl') " +
+                "input('foo' using " + PigStorage.class.getName() + "(',')) " +
+                "stderr();"); 
+        pigServer.registerQuery(
+                "define CMD2 `script2.pl bar` " +
+                "cache ('" + c2 + "#script2.pl') " +
+                "input('bar' using " + PigStorage.class.getName() + "(',')) " +
+                "stderr();"); 
+        pigServer.registerQuery("IP = load 'file:" + input + "' using " + 
+                                PigStorage.class.getName() + "(',');");
+        pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+        pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " +
+                                "through CMD1;");
+        pigServer.registerQuery("OP = stream STREAMED_DATA through CMD2;");
+
+        String output = "/pig/out";
+        pigServer.deleteFile(output);
+        pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
+        
+        InputStream op = FileLocalizer.open(output, pigServer.getPigContext());
+        PigStorage ps = new PigStorage(",");
+        ps.bindTo("", new BufferedPositionedInputStream(op), 0, 
Long.MAX_VALUE); 
+        List<Tuple> outputs = new ArrayList<Tuple>();
+        Tuple t;
+        while ((t = ps.getNext()) != null) {
+            outputs.add(t);
+        }
+
+        // Run the query and check the results
+        Util.checkQueryOutputs(outputs.iterator(), expectedResults);
+    }
+
+    @Test
        public void testOutputShipSpecs() throws Exception {
         // FIXME : this should be tested in all modes
         if(execType == ExecType.LOCAL)


Reply via email to