Author: olga
Date: Mon Apr  7 15:57:21 2008
New Revision: 645726

URL: http://svn.apache.org/viewvc?rev=645726&view=rev
Log:
PIG-182: streaming hang bugfix

Modified:
    incubator/pig/trunk/CHANGES.txt
    
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java
    
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java
    
incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java
    
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
    
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java
    
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java
    incubator/pig/trunk/src/org/apache/pig/impl/util/JarManager.java
    incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java

Modified: incubator/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=645726&r1=645725&r2=645726&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Mon Apr  7 15:57:21 2008
@@ -203,3 +203,5 @@
     PIG-174,180: bug fixes in streaming (arunc via olgan)
 
     PIG-181: streaming bug fixing (arunc via olgan)
+
+    PIG-182: streaming bug fix (arunc via olgan)

Modified: 
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java?rev=645726&r1=645725&r2=645726&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java
 (original)
+++ 
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java
 Mon Apr  7 15:57:21 2008
@@ -51,7 +51,9 @@
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.IndexedTuple;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.PigFile;
 import org.apache.pig.impl.util.JarManager;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -144,24 +146,11 @@
             funcs.addAll(pom.toReduce.getFuncs());
         }
         
-        String shipFiles = 
-            pom.properties.getProperty("pig.streaming.ship.files");
-        List<String> files = new ArrayList<String>(); 
-        if (shipFiles != null) {
-            String[] paths = shipFiles.split(",");
-            for (String path : paths) {
-                path = path.trim();
-                if (path.length() > 0) {
-                    files.add(path.trim());
-                }
-            }
-        }
-
         // create jobs.jar locally and pass it to hadoop
         File submitJarFile = File.createTempFile("Job", ".jar");    
         try {
             FileOutputStream fos = new FileOutputStream(submitJarFile);
-            JarManager.createJar(fos, funcs, files, pom.pigContext);
+            JarManager.createJar(fos, funcs, null, pom.pigContext);
             log.debug("Job jar size = " + submitJarFile.length());
             conf.setJar(submitJarFile.getPath());
             String user = System.getProperty("user.name");
@@ -219,26 +208,10 @@
             conf.set("pig.storeFunc", pom.outputFileSpec.getFuncSpec());
 
             // Setup the DistributedCache for this job
-            DistributedCache.createSymlink(conf);
-            
-            String cacheFiles = 
-                pom.properties.getProperty("pig.streaming.cache.files");
-            if (cacheFiles != null) {
-                String[] paths = cacheFiles.split(",");
-                
-                for (String path : paths) {
-                    path = path.trim();
-                    if (path.length() != 0) {
-                        URI uri = null;
-                        try {
-                            uri = new URI(path);
-                        } catch (URISyntaxException ue) {
-                            throw new IOException("Invalid cache 
specification, file doesn't exist: " + path);
-                        }
-                        DistributedCache.addCacheFile(uri, conf);
-                    }
-                }
-            }
+            setupDistributedCache(pom.pigContext, conf, pom.properties, 
+                                  "pig.streaming.ship.files", true);
+            setupDistributedCache(pom.pigContext, conf, pom.properties, 
+                                  "pig.streaming.cache.files", false);
 
             //TODO - Remove this
             conf.setBoolean("keep.failed.task.files", true);
@@ -396,6 +369,58 @@
     throws IOException {
         for (Map.Entry property : pom.properties.entrySet()) {
             job.set((String)property.getKey(), (String)property.getValue());
+        }
+    }
+    
+    private static void setupDistributedCache(PigContext pigContext,
+                                              Configuration conf, 
+                                              Properties properties, String 
key, 
+                                              boolean shipToCluster) 
+    throws IOException {
+        // Turn on the symlink feature
+        DistributedCache.createSymlink(conf);
+
+        // Set up the DistributedCache for this job        
+        String fileNames = properties.getProperty(key);
+        if (fileNames != null) {
+            String[] paths = fileNames.split(",");
+            
+            for (String path : paths) {
+                path = path.trim();
+                if (path.length() != 0) {
+                    Path src = new Path(path);
+                    
+                    // Ensure that 'src' is a valid URI
+                    URI srcURI = null;
+                    try {
+                        srcURI = new URI(src.toString());
+                    } catch (URISyntaxException ue) {
+                        throw new IOException("Invalid cache specification, " +
+                                                     "file doesn't exist: " + 
src);
+                    }
+                    
+                    // Ship it to the cluster if necessary and add to the
+                    // DistributedCache
+                    if (shipToCluster) {
+                        Path dst = 
+                            new Path(FileLocalizer.getTemporaryPath(null, 
pigContext).toString());
+                        FileSystem fs = dst.getFileSystem(conf);
+                        fs.copyFromLocalFile(src, dst);
+                        
+                        // Construct the dst#srcName uri for DistributedCache
+                        URI dstURI = null;
+                        try {
+                            dstURI = new URI(dst.toString() + "#" + 
src.getName());
+                        } catch (URISyntaxException ue) {
+                            throw new IOException("Invalid ship specification, 
" +
+                                                  "file doesn't exist: " + 
dst);
+                        }
+                        DistributedCache.addCacheFile(dstURI, conf);
+                    } else {
+                        DistributedCache.addCacheFile(srcURI, conf);
+                    }
+                }
+            }
         }
     }
     

Modified: 
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java?rev=645726&r1=645725&r2=645726&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java
 (original)
+++ 
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java
 Mon Apr  7 15:57:21 2008
@@ -162,9 +162,11 @@
             evalPipe.add(t);
         } catch (Throwable tr) {
             log.error(tr);
-            RuntimeException exp = new RuntimeException(tr.getMessage());
-            exp.setStackTrace(tr.getStackTrace());
-            throw exp;
+            
+            // Convert to IOException to ensure Hadoop handles it correctly ...
+            IOException ioe = new IOException(tr.getMessage());
+            ioe.setStackTrace(tr.getStackTrace());
+            throw ioe;
         }
     }
 
@@ -197,8 +199,17 @@
      * Nothing happens here.
      */
     public void close() throws IOException {
+        try {
         if (evalPipe!=null)
             evalPipe.finishPipe();
+        } catch (Throwable t) {
+            log.error(t);
+            
+            // Convert to IOException to ensure Hadoop handles it correctly ...
+            IOException ioe = new IOException(t.getMessage());
+            ioe.setStackTrace(t.getStackTrace());
+            throw ioe;
+        }
     }
 
     public static PigContext getPigContext() {
@@ -308,13 +319,23 @@
     }
     
     public void closeSideFiles(){
+        IOException ioe = null;
         for (PigRecordWriter writer: sideFileWriters){
             try{
                 writer.close(reporter);
             }catch(IOException e){
                 log.error(e);
+                
+                // Save the first IOException which occurred ...
+                if (ioe == null) {
+                    ioe = e;
+                }
             }
         }
+        
+        if (ioe != null) {
+            throw new RuntimeException(ioe);
+        }
     }
 
     class MapDataOutputCollector extends DataCollector {
@@ -340,9 +361,26 @@
         
         @Override
         public void finish(){
-            closeSideFiles();
-            if (group != null)
-                group.finish();
+            try {
+                closeSideFiles();
+                
+                if (group != null) {
+                    group.finish();
+                    group = null;
+                }
+            } catch (Exception e) {
+                try {
+                    if (group != null) {
+                        group.finish();
+                        group = null;
+                    }
+                } catch (Exception innerE) {
+                    log.warn("Failed to cleanup groups with: " + innerE);
+                }
+                
+                // Propagate the original exception
+                throw new RuntimeException(e);
+            }
         }
     }
 

Modified: 
incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java?rev=645726&r1=645725&r2=645726&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java 
(original)
+++ 
incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java 
Mon Apr  7 15:57:21 2008
@@ -129,17 +129,22 @@
     public final void finishPipe() {
         try {
             finish();
-        } finally {
+            
+            if (successor != null) {
+                successor.finishPipe();
+                successor = null;
+            }
+        } catch (Exception e) {
             try {
                 if (successor != null) {
                     successor.finishPipe();
                 } 
-            } catch (Exception e) {
+            } catch (Exception ignored) {
                 // Ignore this exception since the original is more relevant
-                LOG.debug(e);
-            } finally {
-                successor = null;
+                LOG.debug(ignored);
             }
+            successor = null;
+            throw new RuntimeException(e);
         }
     }
     

Modified: 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java?rev=645726&r1=645725&r2=645726&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
 (original)
+++ 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
 Mon Apr  7 15:57:21 2008
@@ -81,8 +81,19 @@
                 
                 LoadFunc inputLoader = 
(LoadFunc)PigContext.instantiateFuncFromSpec(
                                                 loadFileSpec.getFuncSpec());
+
+                // Check if both LoadFunc objects belong to the same type
+                boolean sameType = false;
+                try {
+                    streamLoader.getClass().cast(inputLoader);
+                    sameType = true;
+                } catch (ClassCastException cce) {
+                    sameType = false;
+                }
                 
-                if (streamLoader.equals(inputLoader)) {
+                // Check if both LoadFunc objects belong to the same type and
+                // are equivalent
+                if (sameType && streamLoader.equals(inputLoader)) {
                     // Since they both are the same, we can flip them 
                     // for BinaryStorage
                     load.setInputFileSpec(new 
FileSpec(loadFileSpec.getFileName(), BinaryStorage.class.getName()));

Modified: 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java?rev=645726&r1=645725&r2=645726&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java
 (original)
+++ 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java
 Mon Apr  7 15:57:21 2008
@@ -107,7 +107,19 @@
                 StoreFunc outputStorer = 
(StoreFunc)PigContext.instantiateFuncFromSpec(
                                                 storeFileSpec.getFuncSpec());
                 
-                if (streamStorer.equals(outputStorer)) {
+
+                // Check if both LoadFunc objects belong to the same type
+                boolean sameType = false;
+                try {
+                    streamStorer.getClass().cast(outputStorer);
+                    sameType = true;
+                } catch (ClassCastException cce) {
+                    sameType = false;
+                }
+                
+                // Check if both LoadFunc objects belong to the same type and
+                // are equivalent
+                if (sameType && streamStorer.equals(outputStorer)) {
                     // Since they both are the same, we can flip them 
                     // for BinaryStorage
                     s.setOutputFileSpec(new 
FileSpec(storeFileSpec.getFileName(), BinaryStorage.class.getName()));

Modified: 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=645726&r1=645725&r2=645726&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
(original)
+++ 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
Mon Apr  7 15:57:21 2008
@@ -251,59 +251,66 @@
      private static final String PERL = "perl";
      private static final String PYTHON = "python";
      private void checkAutoShipSpecs(StreamingCommand command, String[] argv) {
+       // Candidate for auto-ship
        String arg0 = argv[0];
-       // Check if command is perl or python ...
+       
+       // Check if command is perl or python ... if so use the first non-option
+       // and non-quoted string as the candidate
         if (arg0.equalsIgnoreCase(PERL) || arg0.equalsIgnoreCase(PYTHON)) {
             for (int i=1; i < argv.length; ++i) {
                if (!argv[i].startsWith("-") && !isQuotedString(argv[i])) {
-                       command.addPathToShip(argv[i]);
-                       command.setExecutable(argv[i]);
+                       checkAndShip(command, argv[i]);
                        break;
                }
             }
         } else {
                // Ship the first argument if it can be ...
-               boolean absPath = arg0.startsWith("/");
-               String filePath =  (absPath) ? arg0 : which(arg0);
-               if (filePath.length() > 0 && checkAndShip(command, filePath)) {
-                       // Make it relative to task's cwd
-                       String runtimeExecutablePath = (absPath) ? ("." + 
filePath) : filePath; 
-                       argv[0] = runtimeExecutablePath;
-                       command.setCommandArgs(argv);
-                       command.setExecutable(runtimeExecutablePath);
-                       
-                       // Ship the file 
-                command.addPathToShip(filePath);
-               }
+               checkAndShip(command, arg0);
         }
      }
      
+     private void checkAndShip(StreamingCommand command, String arg) {
+       // Don't auto-ship if it is an absolute path...
+       if (arg.startsWith("/")) {
+               return;
+       }
+       
+       // $ which arg
+       String argPath = which(arg);
+       if (argPath != null && !inSkipPaths(argPath)) {
+               command.addPathToShip(argPath);
+       }
+        
+     }
+     
      private boolean isQuotedString(String s) {
        return (s.charAt(0) == '\'' && s.charAt(s.length()-1) == '\'');
      }
      
-     private boolean checkAndShip(StreamingCommand command, String file) {
-       // Check if file is in the paths to be skipped 
+     // Check if file is in the list paths to be skipped 
+     private boolean inSkipPaths(String file) {
        for (String skipPath : pigContext.getPathsToSkip()) {
                if (file.startsWith(skipPath)) {
-                       return false;
+                       return true;
                }
        }
-        return true;
+        return false;
      }
 
 
      private static String which(String file) {
         try {
-               ProcessBuilder processBuilder = new ProcessBuilder(new String[] 
{"which", file});
+               ProcessBuilder processBuilder = 
+                   new ProcessBuilder(new String[] {"which", file});
             Process process = processBuilder.start();
     
-            BufferedReader stdout = new BufferedReader(new 
InputStreamReader(process.getInputStream()));
+            BufferedReader stdout = 
+                new BufferedReader(new 
InputStreamReader(process.getInputStream()));
             String fullPath = stdout.readLine();
 
-            return (process.waitFor() == 0) ? fullPath : "";
+            return (process.waitFor() == 0) ? fullPath : null;
         } catch (Exception e) {}
-        return "";
+        return null;
      }
                
      private static final char SINGLE_QUOTE = '\'';
@@ -1505,6 +1512,10 @@
         }
          
         LogicalPlan readFrom = aliases.get(t.image);
+        if (readFrom == null) {
+            throw new ParseException("Undefined alias: " + t.image + 
+                                     " used in STORE");
+        }
         String jobOutputFile = massageFilename(fileName, pigContext);
         lo = new LOStore(opTable, scope, getNextId(), readFrom.getRoot(),
                          new FileSpec(jobOutputFile, functionSpec),

Modified: 
incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=645726&r1=645725&r2=645726&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java 
(original)
+++ 
incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java 
Mon Apr  7 15:57:21 2008
@@ -22,9 +22,13 @@
 import java.io.BufferedReader;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.commons.logging.Log;
@@ -51,7 +55,9 @@
        private static final Log LOG = 
                LogFactory.getLog(ExecutableManager.class.getName());
     private static final int SUCCESS = 0;
-
+    private static final String PATH = "PATH";
+    private static final String BASH = "bash";
+    
        protected StreamingCommand command;        // Streaming command to be 
run
        String[] argv;                             // Parsed/split commands
 
@@ -190,21 +196,66 @@
                    stderrThread.interrupt();
                }
            }
-           
-
        }
 
+       /**
+        * Set up the run-time environment of the managed process.
+        * 
+        * @param pb [EMAIL PROTECTED] ProcessBuilder} used to exec the process
+        */
+       protected void setupEnvironment(ProcessBuilder pb) {
+           String separator = ":";
+           Map<String, String> env = pb.environment();
+           
+           // Add the current-working-directory to the $PATH
+           File dir = pb.directory();
+           String cwd = (dir != null) ? 
+                   dir.getAbsolutePath() : System.getProperty("user.dir");
+           String envPath = env.get(PATH);
+           if (envPath == null) {
+               envPath = cwd;
+           } else {
+               envPath = envPath + separator + cwd;
+           }
+           env.put(PATH, envPath);
+       }
+       
+       /**
+        * Start execution of the external process.
+        * 
+        * This takes care of setting up the environment of the process and also
+        * starts [EMAIL PROTECTED] ProcessErrorThread} to process the 
<code>stderr</code> of
+        * the managed process.
+        * 
+        * @throws IOException
+        */
        protected void exec() throws IOException {
-           // Unquote command-line arguments ...
+           // Set the actual command to run with 'bash -c exec ...'
+        List<String> cmdArgs = new ArrayList<String>();
+        cmdArgs.add(BASH);
+        cmdArgs.add("-c");
+        StringBuffer sb = new StringBuffer();
+        sb.append("exec ");
            for (int i=0; i < argv.length; ++i) {
+               // Single-quote each component, however ensure that already
+               // quoted args are handled right
+               sb.append('\'');
+               
                String arg = argv[i];
                if (arg.charAt(0) == '\'' && arg.charAt(arg.length()-1) == 
'\'') {
-                   argv[i] = arg.substring(1, arg.length()-1);
+                   arg = arg.substring(1, arg.length()-1);
                }
+               sb.append(arg);
+               
+            sb.append('\'');
+            sb.append(" ");
            }
+           cmdArgs.add(sb.toString());
            
         // Start the external process
-        ProcessBuilder processBuilder = new ProcessBuilder(argv);
+        ProcessBuilder processBuilder = 
+            new ProcessBuilder(cmdArgs.toArray(new String[cmdArgs.size()]));
+        setupEnvironment(processBuilder);
         process = processBuilder.start();
         LOG.debug("Started the process for command: " + command);
         

Modified: incubator/pig/trunk/src/org/apache/pig/impl/util/JarManager.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/util/JarManager.java?rev=645726&r1=645725&r2=645726&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/util/JarManager.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/util/JarManager.java Mon Apr  7 
15:57:21 2008
@@ -131,8 +131,10 @@
             new ObjectOutputStream(jarFile).writeObject(pigContext);
         }
         
-        for (String file : files) {
-            addStream(jarFile, file, new FileInputStream(file), contents);
+        if (files != null) {
+            for (String file : files) {
+                addStream(jarFile, file, new FileInputStream(file), contents);
+            }
         }
         
         jarFile.close();

Modified: incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java?rev=645726&r1=645725&r2=645726&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java Mon Apr  7 
15:57:21 2008
@@ -270,7 +270,7 @@
 
         // Pig query to run
         pigServer.registerQuery(
-                "define CMD `." + command + " foo` " +
+                "define CMD `" + command.getName() + " foo` " +
                 "ship ('" + command + "') " +
                 "input('foo' using " + PigStorage.class.getName() + "(',')) " +
                 "stderr();"); 
@@ -320,7 +320,7 @@
                           "}",
                             };
            File command = Util.createInputFile("script", "pl", script);
-           
+
         // Expected results
         String[] expectedFirstFields = 
             new String[] {"A", "A", "A", "A", "A", "A"};
@@ -330,7 +330,7 @@
 
         // Pig query to run
         pigServer.registerQuery(
-                "define CMD `." + command + " foo bar` " +
+                "define CMD `" + command.getName() + " foo bar` " +
                 "ship ('" + command + "') " +
                        "output('foo' using " + PigStorage.class.getName() + 
"(','), " +
                        "'bar' using " + PigStorage.class.getName() + "(',')) " 
+
@@ -358,7 +358,7 @@
     }
 
     @Test
-    public void testInputOutputSpecsWithAutoShip() throws Exception {
+    public void testInputOutputSpecs() throws Exception {
         PigServer pigServer = new PigServer(MAPREDUCE);
 
         File input = Util.createInputFile("tmp", "", 
@@ -382,7 +382,7 @@
                           "}",
                          };
         File command = Util.createInputFile("script", "pl", script);
-        
+
         // Expected results
         String[] expectedFirstFields = 
             new String[] {"A", "B", "C", "A", "D", "A"};
@@ -392,7 +392,7 @@
 
         // Pig query to run
         pigServer.registerQuery(
-                "define CMD `." + command + " foo bar foobar` " +
+                "define CMD `" + command.getName() + " foo bar foobar` " +
                 "ship ('" + command + "') " +
                 "input('foo' using " + PigStorage.class.getName() + "(',')) " +
                 "output('bar' using " + PigStorage.class.getName() + "(','), " 
+


Reply via email to