Author: olga Date: Mon Nov 30 16:24:25 2009 New Revision: 885483 URL: http://svn.apache.org/viewvc?rev=885483&view=rev Log: PIG-872: use distributed cache for the replicated data set in FR join (sriranjan via olgan)
Modified: hadoop/pig/branches/branch-0.6/CHANGES.txt hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Modified: hadoop/pig/branches/branch-0.6/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/CHANGES.txt?rev=885483&r1=885482&r2=885483&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/CHANGES.txt (original) +++ hadoop/pig/branches/branch-0.6/CHANGES.txt Mon Nov 30 16:24:25 2009 @@ -24,6 +24,9 @@ IMPROVEMENTS +PIG-872: use distributed cache for the replicated data set in FR join +(sriranjan via olgan) + PIG-1085: Pass JobConf and UDF specific configuration information to UDFs (gates) Modified: hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=885483&r1=885482&r2=885483&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original) +++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Mon Nov 30 16:24:25 2009 @@ -594,6 +594,25 @@ jobConf.setMapOutputKeyClass(NullablePartitionWritable.class); } + if (mro.isFrjoin()) { + // set up distributed cache for the replicated files + FileSpec[] replFiles = mro.getReplFiles(); + ArrayList<String> replicatedPath = new ArrayList<String>(); + // the first input is not replicated + for(int i=0; i < replFiles.length; i++) { + // ignore fragmented file + if (i != mro.getFragment()) { + replicatedPath.add(replFiles[i].getFileName()); + } + } + try { + setupDistributedCache(pigContext, jobConf, replicatedPath.toArray(new String[0]) , false); + } catch (IOException e) { + String msg = "Internal error. Distributed cache could not be set up for the replicated files"; + throw new IOException (msg, e); + } + } + jobConf.setOutputCommitter(PigOutputCommitter.class); Job job = new Job(jobConf); jobStoreMap.put(job,new Pair<List<POStore>, Path>(storeLocations, tmpLocation)); @@ -853,66 +872,71 @@ Properties properties, String key, boolean shipToCluster) throws IOException { - // Turn on the symlink feature - DistributedCache.createSymlink(conf); - // Set up the DistributedCache for this job String fileNames = properties.getProperty(key); + if (fileNames != null) { String[] paths = fileNames.split(","); + setupDistributedCache(pigContext, conf, paths, shipToCluster); + } + } + + private static void setupDistributedCache(PigContext pigContext, + Configuration conf, String[] paths, boolean shipToCluster) throws IOException { + // Turn on the symlink feature + DistributedCache.createSymlink(conf); - for (String path : paths) { - path = path.trim(); - if (path.length() != 0) { - Path src = new Path(path); + for (String path : paths) { + path = path.trim(); + if (path.length() != 0) { + Path src = new Path(path); + + // Ensure that 'src' is a valid URI + URI srcURI = null; + try { + srcURI = new URI(src.toString()); + } catch (URISyntaxException ue) { + int errCode = 6003; + String msg = "Invalid cache specification. " + + "File doesn't exist: " + src; + throw new ExecException(msg, errCode, PigException.USER_ENVIRONMENT); + } + + // Ship it to the cluster if necessary and add to the + // DistributedCache + if (shipToCluster) { + Path dst = + new Path(FileLocalizer.getTemporaryPath(null, pigContext).toString()); + FileSystem fs = dst.getFileSystem(conf); + fs.copyFromLocalFile(src, dst); - // Ensure that 'src' is a valid URI - URI srcURI = null; + // Construct the dst#srcName uri for DistributedCache + URI dstURI = null; try { - srcURI = new URI(src.toString()); + dstURI = new URI(dst.toString() + "#" + src.getName()); } catch (URISyntaxException ue) { - int errCode = 6003; - String msg = "Invalid cache specification. " + - "File doesn't exist: " + src; - throw new ExecException(msg, errCode, PigException.USER_ENVIRONMENT); - } - - // Ship it to the cluster if necessary and add to the - // DistributedCache - if (shipToCluster) { - Path dst = - new Path(FileLocalizer.getTemporaryPath(null, pigContext).toString()); - FileSystem fs = dst.getFileSystem(conf); - fs.copyFromLocalFile(src, dst); - - // Construct the dst#srcName uri for DistributedCache - URI dstURI = null; - try { - dstURI = new URI(dst.toString() + "#" + src.getName()); - } catch (URISyntaxException ue) { - byte errSrc = pigContext.getErrorSource(); - int errCode = 0; - switch(errSrc) { - case PigException.REMOTE_ENVIRONMENT: - errCode = 6004; - break; - case PigException.USER_ENVIRONMENT: - errCode = 4004; - break; - default: - errCode = 2037; - break; - } - String msg = "Invalid ship specification. " + - "File doesn't exist: " + dst; - throw new ExecException(msg, errCode, errSrc); + byte errSrc = pigContext.getErrorSource(); + int errCode = 0; + switch(errSrc) { + case PigException.REMOTE_ENVIRONMENT: + errCode = 6004; + break; + case PigException.USER_ENVIRONMENT: + errCode = 4004; + break; + default: + errCode = 2037; + break; } - DistributedCache.addCacheFile(dstURI, conf); - } else { - DistributedCache.addCacheFile(srcURI, conf); + String msg = "Invalid ship specification. " + + "File doesn't exist: " + dst; + throw new ExecException(msg, errCode, errSrc); } + DistributedCache.addCacheFile(dstURI, conf); + } else { + DistributedCache.addCacheFile(srcURI, conf); } } - } + } } }