Author: olga
Date: Mon Nov 23 21:59:39 2009
New Revision: 883515
URL: http://svn.apache.org/viewvc?rev=883515&view=rev
Log:
PIG-872: use distributed cache for the replicated data set in FR join
(sriranjan via olgan)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=883515&r1=883514&r2=883515&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Nov 23 21:59:39 2009
@@ -24,6 +24,9 @@
IMPROVEMENTS
+PIG-872: use distributed cache for the replicated data set in FR join
+(sriranjan via olgan)
+
PIG-1053: Consider moving to Hadoop for local mode (ankit.modi via olgan)
PIG-1085: Pass JobConf and UDF specific configuration information to UDFs
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=883515&r1=883514&r2=883515&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
Mon Nov 23 21:59:39 2009
@@ -594,6 +594,25 @@
jobConf.setMapOutputKeyClass(NullablePartitionWritable.class);
}
+ if (mro.isFrjoin()) {
+ // set up distributed cache for the replicated files
+ FileSpec[] replFiles = mro.getReplFiles();
+ ArrayList<String> replicatedPath = new ArrayList<String>();
+ // the first input is not replicated
+ for(int i=0; i < replFiles.length; i++) {
+ // ignore fragmented file
+ if (i != mro.getFragment()) {
+ replicatedPath.add(replFiles[i].getFileName());
+ }
+ }
+ try {
+ setupDistributedCache(pigContext, jobConf,
replicatedPath.toArray(new String[0]) , false);
+ } catch (IOException e) {
+ String msg = "Internal error. Distributed cache could not
be set up for the replicated files";
+ throw new IOException (msg, e);
+ }
+ }
+
jobConf.setOutputCommitter(PigOutputCommitter.class);
Job job = new Job(jobConf);
jobStoreMap.put(job,new Pair<List<POStore>, Path>(storeLocations,
tmpLocation));
@@ -853,66 +872,71 @@
Properties properties, String
key,
boolean shipToCluster)
throws IOException {
- // Turn on the symlink feature
- DistributedCache.createSymlink(conf);
-
// Set up the DistributedCache for this job
String fileNames = properties.getProperty(key);
+
if (fileNames != null) {
String[] paths = fileNames.split(",");
+ setupDistributedCache(pigContext, conf, paths, shipToCluster);
+ }
+ }
+
+ private static void setupDistributedCache(PigContext pigContext,
+ Configuration conf, String[] paths, boolean shipToCluster) throws
IOException {
+ // Turn on the symlink feature
+ DistributedCache.createSymlink(conf);
- for (String path : paths) {
- path = path.trim();
- if (path.length() != 0) {
- Path src = new Path(path);
+ for (String path : paths) {
+ path = path.trim();
+ if (path.length() != 0) {
+ Path src = new Path(path);
+
+ // Ensure that 'src' is a valid URI
+ URI srcURI = null;
+ try {
+ srcURI = new URI(src.toString());
+ } catch (URISyntaxException ue) {
+ int errCode = 6003;
+ String msg = "Invalid cache specification. " +
+ "File doesn't exist: " + src;
+ throw new ExecException(msg, errCode,
PigException.USER_ENVIRONMENT);
+ }
+
+ // Ship it to the cluster if necessary and add to the
+ // DistributedCache
+ if (shipToCluster) {
+ Path dst =
+ new Path(FileLocalizer.getTemporaryPath(null,
pigContext).toString());
+ FileSystem fs = dst.getFileSystem(conf);
+ fs.copyFromLocalFile(src, dst);
- // Ensure that 'src' is a valid URI
- URI srcURI = null;
+ // Construct the dst#srcName uri for DistributedCache
+ URI dstURI = null;
try {
- srcURI = new URI(src.toString());
+ dstURI = new URI(dst.toString() + "#" + src.getName());
} catch (URISyntaxException ue) {
- int errCode = 6003;
- String msg = "Invalid cache specification. " +
- "File doesn't exist: " + src;
- throw new ExecException(msg, errCode,
PigException.USER_ENVIRONMENT);
- }
-
- // Ship it to the cluster if necessary and add to the
- // DistributedCache
- if (shipToCluster) {
- Path dst =
- new Path(FileLocalizer.getTemporaryPath(null,
pigContext).toString());
- FileSystem fs = dst.getFileSystem(conf);
- fs.copyFromLocalFile(src, dst);
-
- // Construct the dst#srcName uri for DistributedCache
- URI dstURI = null;
- try {
- dstURI = new URI(dst.toString() + "#" +
src.getName());
- } catch (URISyntaxException ue) {
- byte errSrc = pigContext.getErrorSource();
- int errCode = 0;
- switch(errSrc) {
- case PigException.REMOTE_ENVIRONMENT:
- errCode = 6004;
- break;
- case PigException.USER_ENVIRONMENT:
- errCode = 4004;
- break;
- default:
- errCode = 2037;
- break;
- }
- String msg = "Invalid ship specification. " +
- "File doesn't exist: " + dst;
- throw new ExecException(msg, errCode, errSrc);
+ byte errSrc = pigContext.getErrorSource();
+ int errCode = 0;
+ switch(errSrc) {
+ case PigException.REMOTE_ENVIRONMENT:
+ errCode = 6004;
+ break;
+ case PigException.USER_ENVIRONMENT:
+ errCode = 4004;
+ break;
+ default:
+ errCode = 2037;
+ break;
}
- DistributedCache.addCacheFile(dstURI, conf);
- } else {
- DistributedCache.addCacheFile(srcURI, conf);
+ String msg = "Invalid ship specification. " +
+ "File doesn't exist: " + dst;
+ throw new ExecException(msg, errCode, errSrc);
}
+ DistributedCache.addCacheFile(dstURI, conf);
+ } else {
+ DistributedCache.addCacheFile(srcURI, conf);
}
}
- }
+ }
}
}