Author: daijy Date: Fri May 14 18:54:28 2010 New Revision: 944391 URL: http://svn.apache.org/viewvc?rev=944391&view=rev Log: PIG-1403: Make Pig work with remote HDFS in secure mode
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt hadoop/pig/trunk/test/org/apache/pig/test/TestParser.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=944391&r1=944390&r2=944391&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Fri May 14 18:54:28 2010 @@ -276,6 +276,8 @@ OPTIMIZATIONS BUG FIXES +PIG-1403: Make Pig work with remote HDFS in secure mode (daijy) + PIG-1394: POCombinerPackage hold too much memory for InternalCachedBag (daijy) PIG-1374: PushDownForeachFlatten shall not push ForEach below Join if the flattened fields is used in the next statement (daijy) Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=944391&r1=944390&r2=944391&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Fri May 14 18:54:28 2010 @@ -38,6 +38,8 @@ import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.pig.ExecType; @@ -166,6 +168,10 @@ public class HExecutionEngine implements jc = new JobConf(); jc.addResource("pig-cluster-hadoop-site.xml"); + // Trick to invoke static initializer of DistributedFileSystem to add hdfs-default.xml + // into configuration + new DistributedFileSystem(); + //the method below alters the properties object by overriding the //hadoop properties with the values from properties and recomputing //the properties Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=944391&r1=944390&r2=944391&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Fri May 14 18:54:28 2010 @@ -467,6 +467,39 @@ public class QueryParser { LogicalOperator getOp(String alias) { return mapAliasOp.get(alias); } + + Set<String> getRemoteHosts(String absolutePath, String defaultHost) { + String HAR_PREFIX = "hdfs-"; + Set<String> result = new HashSet<String>(); + String[] fnames = absolutePath.split(","); + for (String fname: fnames) { + // remove leading/trailing whitespace(s) + fname = fname.trim(); + Path p = new Path(fname); + URI uri = p.toUri(); + if(uri.isAbsolute()) { + String scheme = uri.getScheme(); + if (scheme!=null && scheme.toLowerCase().equals("hdfs")||scheme.toLowerCase().equals("har")) { + if (uri.getHost()==null) + continue; + String thisHost = uri.getHost().toLowerCase(); + if (scheme.toLowerCase().equals("har")) { + if (thisHost.startsWith(HAR_PREFIX)) { + thisHost = thisHost.substring(HAR_PREFIX.length()); + } + } + if (!uri.getHost().isEmpty() && + !thisHost.equals(defaultHost)) { + if (uri.getPort()!=-1) + result.add("hdfs://"+thisHost+":"+uri.getPort()); + else + result.add("hdfs://"+thisHost); + } + } + } + } + return result; + } // Check and set files to be automatically shipped for the given StreamingCommand // Auto-shipping rules: @@ -1350,6 +1383,36 @@ LogicalOperator LoadClause(LogicalPlan l String absolutePath = fileNameMap.get(constructFileNameSignature(filename, funcSpec)); if (absolutePath == null) { absolutePath = loFunc.relativeToAbsolutePath(filename, getCurrentDir(pigContext)); + + // Get native host + String defaultFS = (String)pigContext.getProperties().get("fs.default.name"); + URI defaultFSURI = new URI(defaultFS); + String defaultHost = defaultFSURI.getHost(); + if (defaultHost==null) + defaultHost=""; + defaultHost = defaultHost.toLowerCase(); + + Set<String> remoteHosts = getRemoteHosts(absolutePath, defaultHost); + + String hdfsServersString = (String)pigContext.getProperties().get("mapreduce.job.hdfs-servers"); + if (hdfsServersString==null) hdfsServersString=""; + String hdfsServers[] = hdfsServersString.split(","); + + for (String remoteHost : remoteHosts) { + boolean existing = false; + for (String hdfsServer:hdfsServers) { + if (hdfsServer.equals(remoteHost)) + existing = true; + } + if (!existing) { + if (!hdfsServersString.isEmpty()) + hdfsServersString = hdfsServersString + ","; + hdfsServersString = hdfsServersString + remoteHost; + } + } + + if (!hdfsServersString.isEmpty()) + pigContext.getProperties().setProperty("mapreduce.job.hdfs-servers", hdfsServersString); fileNameMap.put(constructFileNameSignature(filename, funcSpec), absolutePath); } lo = new LOLoad(lp, new OperatorKey(scope, getNextId()), new FileSpec(absolutePath, funcSpec), Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestParser.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestParser.java?rev=944391&r1=944390&r2=944391&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestParser.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestParser.java Fri May 14 18:54:28 2010 @@ -22,6 +22,7 @@ import static org.apache.pig.ExecType.LO import static org.apache.pig.ExecType.MAPREDUCE; import java.io.IOException; +import java.util.Properties; import junit.framework.TestCase; @@ -86,4 +87,36 @@ protected final Log log = LogFactory.get } catch (IOException io) { } } + + @Test + public void testRemoteServerList() throws ExecException, IOException { + try { + Properties pigProperties = pigServer.getPigContext().getProperties(); + pigProperties.setProperty("fs.default.name", "hdfs://a.com:8020"); + + pigServer.registerQuery("a = load '/user/pig/1.txt';"); + assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")==null); + + pigServer.registerQuery("a = load 'hdfs://a.com/user/pig/1.txt';"); + assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")==null); + + pigServer.registerQuery("a = load 'har:///1.txt';"); + assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")==null); + + pigServer.registerQuery("a = load 'hdfs://b.com/user/pig/1.txt';"); + assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")!=null && + pigProperties.getProperty("mapreduce.job.hdfs-servers").contains("hdfs://b.com")); + + pigServer.registerQuery("a = load 'har://hdfs-c.com/user/pig/1.txt';"); + assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")!=null && + pigProperties.getProperty("mapreduce.job.hdfs-servers").contains("hdfs://c.com")); + + pigServer.registerQuery("a = load 'hdfs://d.com:8020/user/pig/1.txt';"); + assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")!=null && + pigProperties.getProperty("mapreduce.job.hdfs-servers").contains("hdfs://d.com:8020")); + + + } catch (IOException io) { + } + } }