Author: yanz
Date: Mon May 17 22:17:50 2010
New Revision: 945406

URL: http://svn.apache.org/viewvc?rev=945406&view=rev
Log:
PIG-1421 Name node calls made by each mapper (xuefuz via yanz)

Modified:
    hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java

Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt?rev=945406&r1=945405&r2=945406&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt Mon May 17 
22:17:50 2010
@@ -78,6 +78,8 @@ Trunk (unreleased changes)
 
   BUG FIXES
 
+    PIG-1421 Name node calls made by each mapper (xuefuz via yanz)
+
     PIG-1356 TableLoader makes unnecessary calls to build a Job instance that 
create a new JobClient in the hadoop 0.20.9 (yanz)
 
     PIG-1349 Hubson test failure in test case TestBasicUnion (xuefuz via yanz)

Modified: 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=945406&r1=945405&r2=945406&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
 (original)
+++ 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
 Mon May 17 22:17:50 2010
@@ -258,7 +258,6 @@ public class BasicTable {
         }
         partition.setSource(cgTuples);
         inferredMapping = true;
-        buildStatus();
       }
       else {
         // the projection is not changed, so we do not need to recalculate the
@@ -317,8 +316,6 @@ public class BasicTable {
           else
             cgTuples[nx] = null;
         }
-        if (schemaFile.isSorted())
-          buildStatus();
         closed = false;
       }
       catch (Exception e) {
@@ -753,6 +750,12 @@ public class BasicTable {
       return schemaFile.getDeletedCGs();
     }
 
+    public static String getDeletedCGs(Path path, Configuration conf)
+    throws IOException {
+       SchemaFile schF = new SchemaFile(path, new String[0], conf);
+       return schF.getDeletedCGs();
+    }
+
     private void buildStatus() throws IOException {
       status = new BasicTableStatus();
       if (firstValidCG >= 0) {

Modified: 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java?rev=945406&r1=945405&r2=945406&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
 (original)
+++ 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
 Mon May 17 22:17:50 2010
@@ -190,9 +190,43 @@ public class TableInputFormat extends In
       }
       setInputExpr(conf, expr);
     }
+    
+    setDeletedCGsInConf( conf, paths );
   }
   
   /**
+   * Temporary fix for name node call in each mapper. It sets two flags in the 
conf so that mapper can skip
+   * the work that's done here at frontend.
+   * 
+   * It needs to check if the flag is already set (because setInputPaths() is 
also called on the backend
+   * thru pig code path (thru TableLoader.setLocation()).
+   * 
+   * @param conf
+   * @param paths
+   */
+  private static void setDeletedCGsInConf(Configuration conf, Path[] paths) {
+      if( !conf.get( INPUT_FE, "false" ).equals( "true" )  ) {
+          try {
+              StringBuilder sb = new StringBuilder();
+              boolean first = true;
+              for( Path p : paths ) {
+                  if (first)
+                      first = false;
+                  else
+                      sb.append( DELETED_CG_SEPARATOR_PER_UNION );
+                  sb.append( BasicTable.Reader.getDeletedCGs( p, conf ) );
+              }
+
+              conf.set(INPUT_FE, "true");
+              conf.set(INPUT_DELETED_CGS, sb.toString());
+          } catch(Exception ex) {
+              throw new RuntimeException( "Failed to find deleted column 
groups" + ex.toString() );
+          }
+      }
+  }
+
+  
+  /**
    * Set the input expression in the Configuration object.
    * 
    * @param conf
@@ -954,28 +988,17 @@ public class TableInputFormat extends In
                new ArrayList<BasicTableStatus>(nLeaves);
 
        try {
-        StringBuilder sb = new StringBuilder();
         boolean sorted = expr.sortedSplitRequired();
-        boolean first = true;
                for (Iterator<LeafTableInfo> it = leaves.iterator(); 
it.hasNext();) {
                        LeafTableInfo leaf = it.next();
                        BasicTable.Reader reader =
                                new BasicTable.Reader(leaf.getPath(), conf );
                        reader.setProjection(leaf.getProjection());
                        BasicTableStatus s = reader.getStatus();
-                 status.add(s);
+                   status.add(s);
                        readers.add(reader);
-          if (first)
-            first = false;
-          else {
-            sb.append(TableInputFormat.DELETED_CG_SEPARATOR_PER_UNION);
-          }
-          sb.append(reader.getDeletedCGs());
                }
 
-        conf.set(INPUT_FE, "true");
-        conf.set(INPUT_DELETED_CGS, sb.toString());
-
                if( readers.isEmpty() ) {
                        // I think we should throw exception here.
                        return new ArrayList<InputSplit>();

Modified: 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java?rev=945406&r1=945405&r2=945406&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
 (original)
+++ 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
 Mon May 17 22:17:50 2010
@@ -72,6 +72,15 @@ public class TableLoader extends LoadFun
 
     private static final String UDFCONTEXT_PROJ_STRING = 
"zebra.UDFContext.projectionString";
     private static final String UDFCONTEXT_GLOBAL_SORTING = 
"zebra.UDFContext.globalSorting";
+    private static final String UDFCONTEXT_PATHS_STRING = 
"zebra.UDFContext.pathsString";
+    private static final String UDFCONTEXT_INPUT_DELETED_CGS = 
"zebra.UDFContext.deletedcgs";
+    private static final String INPUT_SORT = "mapreduce.lib.table.input.sort";
+    private static final String INPUT_SPLIT_MODE = 
"mapreduce.lib.table.input.split_mode";
+
+    private static final String INPUT_FE = "mapreduce.lib.table.input.fe";
+    private static final String INPUT_DELETED_CGS = 
"mapreduce.lib.table.input.deleted_cgs";
+    private static final String GLOBALLY_SORTED = "globally_sorted";
+    private static final String LOCALLY_SORTED = "locally_sorted";
 
     private String projectionString;
 
@@ -190,8 +199,36 @@ public class TableLoader extends LoadFun
            sortInfo = TableInputFormat.getSortInfo( job );
        }       
     }
-    
-    
+
+    /**
+     * This is a light version of setSortOrder, which is called in 
getLocation(), which is also called at backend.
+     * We need to do this to avoid name node call in mappers. Original 
setSortOrder() is stilled called (in
+     * getSchema()), so it's okay to skip the checks that are performed when 
setSortOrder() is called.
+     * 
+     * This will go away once we have a better solution.
+     * 
+     * @param job
+     * @throws IOException
+     */
+    private void setSortOrderLight(Job job) throws IOException {
+        Properties properties = UDFContext.getUDFContext().getUDFProperties( 
+               this.getClass(), new String[]{ udfContextSignature } );
+        boolean requireGlobalOrder = "true".equals(properties.getProperty( 
UDFCONTEXT_GLOBAL_SORTING));
+        if (requireGlobalOrder && !sorted)
+          throw new IOException("Global sorting can be only asked on table 
loaded as sorted");
+        if( sorted ) {
+            SplitMode splitMode = 
+                requireGlobalOrder ? SplitMode.GLOBALLY_SORTED : 
SplitMode.LOCALLY_SORTED;
+
+            Configuration conf = job.getConfiguration();
+            conf.setBoolean(INPUT_SORT, true);
+            if (splitMode == SplitMode.GLOBALLY_SORTED)
+                conf.set(INPUT_SPLIT_MODE, GLOBALLY_SORTED);
+            else if (splitMode == SplitMode.LOCALLY_SORTED)
+                conf.set(INPUT_SPLIT_MODE, LOCALLY_SORTED);
+        }        
+     }
+
     /**
      * This method sets projection.
      * 
@@ -289,16 +326,50 @@ public class TableLoader extends LoadFun
              throw new IOException( "Invalid object type passed to 
TableLoader" );
      }
 
+     /**
+      * This method is called by pig on both frontend and backend.
+      */
      @Override
      public void setLocation(String location, Job job) throws IOException {
-         Path[] paths = getPathsFromLocation( location, job );
-         TableInputFormat.setInputPaths( job, paths );
+         Properties properties = UDFContext.getUDFContext().getUDFProperties( 
+                 this.getClass(), new String[]{ udfContextSignature } );
 
+         // Retrieve paths from UDFContext to avoid name node call in mapper.
+         String pathString = properties.getProperty( UDFCONTEXT_PATHS_STRING );
+         Path[]paths = deserializePaths( pathString );
+         
+         // Retrieve deleted column group information to avoid name node call 
in mapper.
+         String deletedCGs = properties.getProperty( 
UDFCONTEXT_INPUT_DELETED_CGS );
+         job.getConfiguration().set(INPUT_FE, "true");
+         job.getConfiguration().set(INPUT_DELETED_CGS, deletedCGs );
+         
+         TableInputFormat.setInputPaths( job, paths );
+         
          // The following obviously goes beyond of set location, but this is 
the only place that we
          // can do and it's suggested by Pig team.
-         setSortOrder( job );
+         setSortOrderLight( job );
          setProjection( job );
      }
+     
+     private static String serializePaths(Path[] paths) {
+         StringBuilder sb = new StringBuilder();
+         for( int i = 0; i < paths.length; i++ ) {
+             sb.append( paths[i].toString() );
+             if( i < paths.length -1 ) {
+                 sb.append( ";" );
+             }
+         }
+         return sb.toString();
+     }
+     
+     private static Path[] deserializePaths(String val) {
+         String[] paths = val.split( ";" );
+         Path[] result = new Path[paths.length];
+         for( int i = 0; i < paths.length; i++ ) {
+             result[i] = new Path( paths[i] );
+         }
+         return result;
+     }
 
      @SuppressWarnings("unchecked")
      @Override
@@ -314,9 +385,19 @@ public class TableLoader extends LoadFun
 
      @Override
      public ResourceSchema getSchema(String location, Job job) throws 
IOException {
-         Path[] paths = getPathsFromLocation( location, job);
+         Properties properties = UDFContext.getUDFContext().getUDFProperties( 
+                 this.getClass(), new String[]{ udfContextSignature } );
+         
+         // Save the paths in UDFContext so that it can be retrieved in 
setLocation().
+         Path[] paths = getPathsFromLocation( location, job );
+         properties.setProperty( UDFCONTEXT_PATHS_STRING, serializePaths( 
paths ) );
+         
          TableInputFormat.setInputPaths( job, paths );
 
+         // Save the deleted column group information in UDFContext so that it 
can be used in setLocation().
+         // Property INPUT_DELETED_CGS is set in 
TableInputFormat.setInputPaths(). We just retrieve it here.
+         properties.setProperty( UDFCONTEXT_INPUT_DELETED_CGS,  
job.getConfiguration().get(INPUT_DELETED_CGS) );
+
          Schema tableSchema = null;
          if( paths.length == 1 ) {
              tableSchema = BasicTable.Reader.getSchema( paths[0], 
job.getConfiguration() );


Reply via email to