Author: olga
Date: Mon Nov 23 21:59:39 2009
New Revision: 883515

URL: http://svn.apache.org/viewvc?rev=883515&view=rev
Log:
PIG-872: use distributed cache for the replicated data set in FR join
(sriranjan via olgan)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=883515&r1=883514&r2=883515&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Nov 23 21:59:39 2009
@@ -24,6 +24,9 @@
 
 IMPROVEMENTS
 
+PIG-872: use distributed cache for the replicated data set in FR join
+(sriranjan via olgan)
+
 PIG-1053: Consider moving to Hadoop for local mode (ankit.modi via olgan)
 
 PIG-1085:  Pass JobConf and UDF specific configuration information to UDFs

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=883515&r1=883514&r2=883515&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 Mon Nov 23 21:59:39 2009
@@ -594,6 +594,25 @@
                                
jobConf.setMapOutputKeyClass(NullablePartitionWritable.class);
             }
             
+            if (mro.isFrjoin()) {
+                // set up distributed cache for the replicated files
+                FileSpec[] replFiles = mro.getReplFiles();
+                ArrayList<String> replicatedPath = new ArrayList<String>();
+                // the first input is not replicated
+                for(int i=0; i < replFiles.length; i++) {
+                    // ignore fragmented file
+                    if (i != mro.getFragment()) {
+                        replicatedPath.add(replFiles[i].getFileName());
+                    }
+                }
+                try {
+                    setupDistributedCache(pigContext, jobConf, 
replicatedPath.toArray(new String[0]) , false);
+                } catch (IOException e) {
+                    String msg = "Internal error. Distributed cache could not 
be set up for the replicated files";
+                    throw new IOException (msg, e);
+                }
+            }
+            
             jobConf.setOutputCommitter(PigOutputCommitter.class);
             Job job = new Job(jobConf);
             jobStoreMap.put(job,new Pair<List<POStore>, Path>(storeLocations, 
tmpLocation));
@@ -853,66 +872,71 @@
                                               Properties properties, String 
key, 
                                               boolean shipToCluster) 
     throws IOException {
-        // Turn on the symlink feature
-        DistributedCache.createSymlink(conf);
-
         // Set up the DistributedCache for this job        
         String fileNames = properties.getProperty(key);
+        
         if (fileNames != null) {
             String[] paths = fileNames.split(",");
+            setupDistributedCache(pigContext, conf, paths, shipToCluster);
+        }
+    }
+        
+    private static void setupDistributedCache(PigContext pigContext,
+            Configuration conf, String[] paths, boolean shipToCluster) throws 
IOException {
+        // Turn on the symlink feature
+        DistributedCache.createSymlink(conf);
             
-            for (String path : paths) {
-                path = path.trim();
-                if (path.length() != 0) {
-                    Path src = new Path(path);
+        for (String path : paths) {
+            path = path.trim();
+            if (path.length() != 0) {
+                Path src = new Path(path);
+                
+                // Ensure that 'src' is a valid URI
+                URI srcURI = null;
+                try {
+                    srcURI = new URI(src.toString());
+                } catch (URISyntaxException ue) {
+                    int errCode = 6003;
+                    String msg = "Invalid cache specification. " +
+                    "File doesn't exist: " + src;
+                    throw new ExecException(msg, errCode, 
PigException.USER_ENVIRONMENT);
+                }
+                
+                // Ship it to the cluster if necessary and add to the
+                // DistributedCache
+                if (shipToCluster) {
+                    Path dst = 
+                        new Path(FileLocalizer.getTemporaryPath(null, 
pigContext).toString());
+                    FileSystem fs = dst.getFileSystem(conf);
+                    fs.copyFromLocalFile(src, dst);
                     
-                    // Ensure that 'src' is a valid URI
-                    URI srcURI = null;
+                    // Construct the dst#srcName uri for DistributedCache
+                    URI dstURI = null;
                     try {
-                        srcURI = new URI(src.toString());
+                        dstURI = new URI(dst.toString() + "#" + src.getName());
                     } catch (URISyntaxException ue) {
-                        int errCode = 6003;
-                        String msg = "Invalid cache specification. " +
-                        "File doesn't exist: " + src;
-                        throw new ExecException(msg, errCode, 
PigException.USER_ENVIRONMENT);
-                    }
-                    
-                    // Ship it to the cluster if necessary and add to the
-                    // DistributedCache
-                    if (shipToCluster) {
-                        Path dst = 
-                            new Path(FileLocalizer.getTemporaryPath(null, 
pigContext).toString());
-                        FileSystem fs = dst.getFileSystem(conf);
-                        fs.copyFromLocalFile(src, dst);
-                        
-                        // Construct the dst#srcName uri for DistributedCache
-                        URI dstURI = null;
-                        try {
-                            dstURI = new URI(dst.toString() + "#" + 
src.getName());
-                        } catch (URISyntaxException ue) {
-                            byte errSrc = pigContext.getErrorSource();
-                            int errCode = 0;
-                            switch(errSrc) {
-                            case PigException.REMOTE_ENVIRONMENT:
-                                errCode = 6004;
-                                break;
-                            case PigException.USER_ENVIRONMENT:
-                                errCode = 4004;
-                                break;
-                            default:
-                                errCode = 2037;
-                                break;
-                            }
-                            String msg = "Invalid ship specification. " +
-                            "File doesn't exist: " + dst;
-                            throw new ExecException(msg, errCode, errSrc);
+                        byte errSrc = pigContext.getErrorSource();
+                        int errCode = 0;
+                        switch(errSrc) {
+                        case PigException.REMOTE_ENVIRONMENT:
+                            errCode = 6004;
+                            break;
+                        case PigException.USER_ENVIRONMENT:
+                            errCode = 4004;
+                            break;
+                        default:
+                            errCode = 2037;
+                            break;
                         }
-                        DistributedCache.addCacheFile(dstURI, conf);
-                    } else {
-                        DistributedCache.addCacheFile(srcURI, conf);
+                        String msg = "Invalid ship specification. " +
+                        "File doesn't exist: " + dst;
+                        throw new ExecException(msg, errCode, errSrc);
                     }
+                    DistributedCache.addCacheFile(dstURI, conf);
+                } else {
+                    DistributedCache.addCacheFile(srcURI, conf);
                 }
             }
-        }
+        }        
     }
 }


Reply via email to