Author: thejas
Date: Wed Sep 29 15:46:05 2010
New Revision: 1002692

URL: http://svn.apache.org/viewvc?rev=1002692&view=rev
Log:
PIG-1649: FRJoin fails to compute number of input files for replicated input 

Added:
    hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/util/UriUtil.java
Modified:
    hadoop/pig/branches/branch-0.8/CHANGES.txt
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin2.java
    hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/Util.java

Modified: hadoop/pig/branches/branch-0.8/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/CHANGES.txt?rev=1002692&r1=1002691&r2=1002692&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.8/CHANGES.txt Wed Sep 29 15:46:05 2010
@@ -198,6 +198,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1649: FRJoin fails to compute number of input files for replicated input 
(thejas)
+
 PIG-1637: Combiner not use because optimizor inserts a foreach between group
 and algebric function (daijy)
 

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1002692&r1=1002691&r2=1002692&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 Wed Sep 29 15:46:05 2010
@@ -47,6 +47,7 @@ import org.apache.hadoop.util.RunJar;
 import org.apache.pig.ComparisonFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
+import org.apache.pig.LoadFunc;
 import org.apache.pig.PigException;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -88,6 +89,7 @@ import org.apache.pig.impl.util.JarManag
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.UriUtil;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.ScriptState;
 
@@ -709,13 +711,23 @@ public class JobControlCompiler{
         long size = 0;
         FileSystem fs = FileSystem.get(conf);
         for (String input : inputs){
-            Path path = new Path(input);
-           String schema = path.toUri().getScheme();
-            if (schema==null || schema.equalsIgnoreCase("hdfs") || 
schema.equalsIgnoreCase("file")){
-                FileStatus[] status=fs.globStatus(new Path(input));
+            //Using custom uri parsing because 'new Path(location).toUri()' 
fails
+            // for some valid uri's (eg jdbc style), and 'new Uri(location)' 
fails
+            // for valid hdfs paths that contain curly braces
+            if(UriUtil.isHDFSFileOrLocal(input)){
+                //skip  if it is not hdfs or local file
+                continue;
+            }
+            //the input file location might be a list of comma separeated 
files, 
+            // separate them out
+            for(String location : LoadFunc.getPathStrings(input)){
+                if(! UriUtil.isHDFSFileOrLocal(location)){
+                    continue;
+                }
+                FileStatus[] status=fs.globStatus(new Path(location));
                 if (status != null){
                     for (FileStatus s : status){
-                       size += getPathLength(fs, s);
+                        size += getPathLength(fs, s);
                     }
                 }
             }

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1002692&r1=1002691&r2=1002692&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Wed Sep 29 15:46:05 2010
@@ -19,8 +19,6 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -34,7 +32,6 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -80,7 +77,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
@@ -98,7 +94,6 @@ import org.apache.pig.impl.builtin.Poiss
 import org.apache.pig.impl.builtin.RandomSampleLoader;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -112,7 +107,9 @@ import org.apache.pig.impl.util.Compiler
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.util.UriUtil;
 import org.apache.pig.impl.util.Utils;
+import org.mortbay.util.URIUtil;
 
 /**
  * The compiler that compiles a given physical plan
@@ -1279,48 +1276,54 @@ public class MRCompiler extends PhyPlanV
         try {
             for (PhysicalOperator root : roots) {
                 POLoad ld = (POLoad) root;
-                String location = ld.getLFile().getFileName();
-                URI uri = new URI(location);
-                if (uri.getScheme() == null
-                        || uri.getScheme().equalsIgnoreCase("hdfs")) {
-                    Path p = new Path(location);                   
-                    FileSystem fs = p.getFileSystem(conf);
-                    if (fs.exists(p)) {
-                        LoadFunc loader = (LoadFunc) PigContext
-                                .instantiateFuncFromSpec(ld.getLFile()
-                                        .getFuncSpec());
-                        Job job = new Job(conf);
-                        loader.setLocation(location, job);
-                        InputFormat inf = loader.getInputFormat();
-                        List<InputSplit> splits = inf.getSplits(new JobContext(
-                                job.getConfiguration(), job.getJobID()));
-                        List<List<InputSplit>> results = MapRedUtil
-                                .getCombinePigSplits(splits, fs
-                                        .getDefaultBlockSize(), conf);
-                        numFiles += results.size();
-                    } else {
-                        List<MapReduceOper> preds = 
MRPlan.getPredecessors(mro);
-                        if (preds != null && preds.size() == 1) {
-                            MapReduceOper pred = preds.get(0);
-                            if (!pred.reducePlan.isEmpty()) { 
-                                numFiles += pred.requestedParallelism;  
-                            } else { // map-only job
-                                ret = hasTooManyInputFiles(pred, conf);
+                String fileName = ld.getLFile().getFileName();
+                
+                if(UriUtil.isHDFSFile(fileName)){
+                    // Only if the input is an hdfs file, this optimization is 
+                    // useful (to reduce load on namenode)
+                    
+                    //separate out locations separated by comma
+                    String [] locations = LoadFunc.getPathStrings(fileName);
+                    for(String location : locations){
+                        if(!UriUtil.isHDFSFile(location))
+                            continue;
+                        Path path = new Path(location);
+                        FileSystem fs = path.getFileSystem(conf);
+                        if (fs.exists(path)) {
+                            LoadFunc loader = (LoadFunc) PigContext
+                            .instantiateFuncFromSpec(ld.getLFile()
+                                    .getFuncSpec());
+                            Job job = new Job(conf);
+                            loader.setLocation(location, job);
+                            InputFormat inf = loader.getInputFormat();
+                            List<InputSplit> splits = inf.getSplits(new 
JobContext(
+                                    job.getConfiguration(), job.getJobID()));
+                            List<List<InputSplit>> results = MapRedUtil
+                            .getCombinePigSplits(splits, fs
+                                    .getDefaultBlockSize(), conf);
+                            numFiles += results.size();
+                        } else {
+                            List<MapReduceOper> preds = 
MRPlan.getPredecessors(mro);
+                            if (preds != null && preds.size() == 1) {
+                                MapReduceOper pred = preds.get(0);
+                                if (!pred.reducePlan.isEmpty()) { 
+                                    numFiles += pred.requestedParallelism;
+                                } else { // map-only job
+                                    ret = hasTooManyInputFiles(pred, conf);
+                                    break;
+                                }
+                            } else if (!optimisticFileConcatenation) {         
           
+                                // can't determine the number of input files. 
+                                // Treat it as having too manyfiles
+                                numFiles = fileConcatenationThreshold;
                                 break;
                             }
-                        } else if (!optimisticFileConcatenation) {             
       
-                            // can't determine the number of input files. 
-                            // Treat it as having too manyfiles
-                            numFiles = fileConcatenationThreshold;
-                            break;
                         }
                     }
                 }
             }
         } catch (IOException e) {
             LOG.warn("failed to get number of input files", e); 
-        } catch (URISyntaxException e) {
-            LOG.warn("failed to get number of input files", e); 
         } catch (InterruptedException e) {
             LOG.warn("failed to get number of input files", e); 
         }

Added: hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/util/UriUtil.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/util/UriUtil.java?rev=1002692&view=auto
==============================================================================
--- hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/util/UriUtil.java 
(added)
+++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/util/UriUtil.java 
Wed Sep 29 15:46:05 2010
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.util;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class UriUtil {
+    public static boolean isHDFSFile(String uri){
+        if(uri == null)
+            return false;
+        if(uri.startsWith("/") || uri.startsWith("hdfs:")){
+            return true;
+        }
+        return false;
+    }
+
+    public static boolean isHDFSFileOrLocal(String uri){
+        if(uri == null)
+            return false;
+        if(uri.startsWith("/") || uri.startsWith("hdfs:") || 
uri.startsWith("file:")){
+            return true;
+        }
+        return false;
+    }
+    
+}

Modified: 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin2.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin2.java?rev=1002692&r1=1002691&r2=1002692&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin2.java 
(original)
+++ hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin2.java 
Wed Sep 29 15:46:05 2010
@@ -19,6 +19,7 @@ package org.apache.pig.test;
 
 import static org.junit.Assert.assertEquals;
 
+import java.io.File;
 import java.util.Iterator;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -47,8 +48,14 @@ public class TestFRJoin2 {
     private static final int FILE_MERGE_THRESHOLD = 5;
     private static final int MIN_FILE_MERGE_THRESHOLD = 1;
     
+    //contents of input dir joined by comma
+    private static String concatINPUT_DIR = null;
+    private File logFile;
+
+    
     @BeforeClass
     public static void setUpBeforeClass() throws Exception {
+        StringBuilder strBuilder = new StringBuilder();
         FileSystem fs = cluster.getFileSystem();
         fs.mkdirs(new Path(INPUT_DIR));
         int LOOP_SIZE = 2;
@@ -59,9 +66,14 @@ public class TestFRJoin2 {
                     input[n*LOOP_SIZE + j] = i + "\t" + (j + n);
                 }
             }
-            Util.createInputFile(cluster, INPUT_DIR + "/part-0000" + i, input);
+            String newFile = INPUT_DIR + "/part-0000" + i;
+            Util.createInputFile(cluster, newFile, input);
+            strBuilder.append(newFile);
+            strBuilder.append(",");
         }
-
+        strBuilder.deleteCharAt(strBuilder.length() - 1);
+        concatINPUT_DIR = strBuilder.toString();
+        
         String[] input2 = new String[2*(LOOP_SIZE/2)];
         int k = 0;
         for (int i=1; i<=LOOP_SIZE/2; i++) {
@@ -132,11 +144,12 @@ public class TestFRJoin2 {
     // a Map-only job
     @Test
     public void testConcatenateJobForScalar2() throws Exception {
+        logFile = Util.resetLog(MRCompiler.class, logFile);
         PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster
                 .getProperties());
         
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as 
(x:int,y:int);");
-        pigServer.registerQuery("B = LOAD '" + INPUT_DIR + "' as 
(x:int,y:int);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_DIR +  "/{part-00*}" +"' 
as (x:int,y:int);");
         pigServer.registerQuery("C = filter B by (x == 3) AND (y == 2);");
         
         DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = 
BagFactory.getInstance().newDefaultBag();
@@ -157,6 +170,10 @@ public class TestFRJoin2 {
             JobStats js = 
(JobStats)jGraph.getSuccessors(jGraph.getSources().get(0)).get(0);
             assertEquals(1, js.getNumberMaps());   
             assertEquals(0, js.getNumberReduces()); 
+            Util.checkLogFileMessage(logFile, 
+                    new String[] {"number of input files: 0", "failed to get 
number of input files"}, 
+                    false
+            );
         }
         {
             pigServer.getPigContext().getProperties().setProperty(
@@ -224,11 +241,12 @@ public class TestFRJoin2 {
     
     @Test
     public void testConcatenateJobForFRJoin() throws Exception {
+        logFile = Util.resetLog(MRCompiler.class, logFile);
         PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster
                 .getProperties());
         
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as 
(x:int,y:int);");
-        pigServer.registerQuery("B = LOAD '" + INPUT_DIR + "' as 
(x:int,y:int);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_DIR +  "/{part-00*}" + "' 
as (x:int,y:int);");
         
         DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = 
BagFactory.getInstance().newDefaultBag();
         {
@@ -243,6 +261,11 @@ public class TestFRJoin2 {
             }
             
             assertEquals(3, PigStats.get().getJobGraph().size());
+            Util.checkLogFileMessage(logFile, 
+                    new String[] {"number of input files: 0", "failed to get 
number of input files"}, 
+                    false
+            );
+
         }
         {
             pigServer.getPigContext().getProperties().setProperty(
@@ -259,7 +282,7 @@ public class TestFRJoin2 {
         }
         
         assertEquals(dbfrj.size(), dbshj.size());
-        assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));    
+        assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));  
     }
             
     @Test
@@ -309,7 +332,7 @@ public class TestFRJoin2 {
     public void testUnknownNumMaps() throws Exception {
         PigServer pigServer = new PigServer(ExecType.MAPREDUCE, 
cluster.getProperties());
         
-        pigServer.registerQuery("A = LOAD '" + INPUT_DIR + "' as 
(x:int,y:int);");
+        pigServer.registerQuery("A = LOAD '" + concatINPUT_DIR + "' as 
(x:int,y:int);");
         pigServer.registerQuery("B = Filter A by x < 50;");
         DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = 
BagFactory.getInstance().newDefaultBag();
         {

Modified: hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/Util.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/Util.java?rev=1002692&r1=1002691&r2=1002692&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/Util.java (original)
+++ hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/Util.java Wed Sep 
29 15:46:05 2010
@@ -26,6 +26,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
+import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
@@ -51,8 +52,11 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.SimpleLayout;
 import org.apache.pig.ExecType;
-import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -75,8 +79,6 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.parser.QueryParser;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.LogUtils;
 import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
 import org.apache.pig.newplan.logical.optimizer.LogicalPlanPrinter;
 import org.apache.pig.newplan.logical.optimizer.SchemaResetter;
@@ -738,4 +740,64 @@ public class Util {
         reader.close();
         return tuples;
     }
+    
+    /**
+     * Delete the existing logFile for the class and set the logging to a 
+     * use a new log file and set log level to DEBUG
+     * @param clazz class for which the log file is being set
+     * @param logFile current log file
+     * @return new log file
+     * @throws Exception
+     */
+    public static File resetLog(Class<?> clazz, File logFile) throws Exception 
{
+        if (logFile != null)
+            logFile.delete();
+        Logger logger = Logger.getLogger(clazz);
+        logger.removeAllAppenders();
+        logger.setLevel(Level.DEBUG);
+        SimpleLayout layout = new SimpleLayout();
+        File newLogFile = File.createTempFile("log", "");
+        FileAppender appender = new FileAppender(layout, newLogFile.toString(),
+                        false, false, 0);
+        logger.addAppender(appender);
+        return newLogFile;
+    }
+
+    /**
+     * Check if logFile (does not/)contains the given list of messages. 
+     * @param logFile
+     * @param messages
+     * @param expected if true, the messages are expected in the logFile, 
+     *        otherwise messages should not be there in the log
+     */
+    public static void checkLogFileMessage(File logFile, String[] messages, 
boolean expected) {
+        BufferedReader reader = null;
+
+        try {
+            reader = new BufferedReader(new FileReader(logFile));
+            String logMessage = "";
+            String line;
+            while ((line = reader.readLine()) != null) {
+                logMessage = logMessage + line + "\n";
+            }
+            for (int i = 0; i < messages.length; i++) {
+                boolean present = logMessage.contains(messages[i]);
+                if (expected) {
+                    if(!present){
+                        fail("The message " + messages[i] + " is not present 
in" +
+                                "log file contents: " + logMessage);
+                    }
+                }else{
+                    if(present){
+                        fail("The message " + messages[i] + " is present in" +
+                                "log file contents: " + logMessage);
+                    }
+                }
+            }
+            return ;
+        }
+        catch (IOException e) {
+            fail("caught exception while checking log message :" + e);
+        }
+    }
 }


Reply via email to