Author: daijy
Date: Fri May 14 18:54:28 2010
New Revision: 944391

URL: http://svn.apache.org/viewvc?rev=944391&view=rev
Log:
PIG-1403: Make Pig work with remote HDFS in secure mode

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    hadoop/pig/trunk/test/org/apache/pig/test/TestParser.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=944391&r1=944390&r2=944391&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri May 14 18:54:28 2010
@@ -276,6 +276,8 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-1403: Make Pig work with remote HDFS in secure mode (daijy)
+
 PIG-1394: POCombinerPackage hold too much memory for InternalCachedBag (daijy)
 
 PIG-1374: PushDownForeachFlatten shall not push ForEach below Join if the 
flattened fields is used in the next statement (daijy)

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=944391&r1=944390&r2=944391&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
 Fri May 14 18:54:28 2010
@@ -38,6 +38,8 @@ import java.util.Properties;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.ExecType;
@@ -166,6 +168,10 @@ public class HExecutionEngine implements
             jc = new JobConf();
             jc.addResource("pig-cluster-hadoop-site.xml");
             
+            // Trick to invoke static initializer of DistributedFileSystem to 
add hdfs-default.xml 
+            // into configuration
+            new DistributedFileSystem();
+            
             //the method below alters the properties object by overriding the
             //hadoop properties with the values from properties and recomputing
             //the properties

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=944391&r1=944390&r2=944391&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
(original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
Fri May 14 18:54:28 2010
@@ -467,6 +467,39 @@ public class QueryParser {
         LogicalOperator getOp(String alias) {
                return mapAliasOp.get(alias);
         }
+        
+        Set<String> getRemoteHosts(String absolutePath, String defaultHost) {
+            String HAR_PREFIX = "hdfs-";
+            Set<String> result = new HashSet<String>();
+            String[] fnames = absolutePath.split(",");
+            for (String fname: fnames) {
+                // remove leading/trailing whitespace(s)
+                fname = fname.trim();
+                Path p = new Path(fname);
+                URI uri = p.toUri();
+                if(uri.isAbsolute()) {
+                    String scheme = uri.getScheme();
+                    if (scheme!=null && 
scheme.toLowerCase().equals("hdfs")||scheme.toLowerCase().equals("har")) {
+                        if (uri.getHost()==null)
+                            continue;
+                        String thisHost = uri.getHost().toLowerCase();
+                        if (scheme.toLowerCase().equals("har")) {
+                            if (thisHost.startsWith(HAR_PREFIX)) {
+                                thisHost = 
thisHost.substring(HAR_PREFIX.length());
+                            }
+                        }
+                        if (!uri.getHost().isEmpty() && 
+                                !thisHost.equals(defaultHost)) {
+                            if (uri.getPort()!=-1)
+                                
result.add("hdfs://"+thisHost+":"+uri.getPort());
+                            else
+                                result.add("hdfs://"+thisHost);
+                        }
+                    }
+                }
+            }
+            return result;
+        }
 
      // Check and set files to be automatically shipped for the given 
StreamingCommand
      // Auto-shipping rules:
@@ -1350,6 +1383,36 @@ LogicalOperator LoadClause(LogicalPlan l
             String absolutePath = 
fileNameMap.get(constructFileNameSignature(filename, funcSpec));
             if (absolutePath == null) {
                 absolutePath = loFunc.relativeToAbsolutePath(filename, 
getCurrentDir(pigContext));
+                
+                // Get native host
+                String defaultFS = 
(String)pigContext.getProperties().get("fs.default.name");
+                URI defaultFSURI = new URI(defaultFS);
+                String defaultHost = defaultFSURI.getHost();
+                if (defaultHost==null)
+                    defaultHost="";
+                defaultHost = defaultHost.toLowerCase();
+
+                Set<String> remoteHosts = getRemoteHosts(absolutePath, 
defaultHost);
+                
+                String hdfsServersString = 
(String)pigContext.getProperties().get("mapreduce.job.hdfs-servers");
+                if (hdfsServersString==null) hdfsServersString="";
+                String hdfsServers[] = hdfsServersString.split(",");
+                
+                for (String remoteHost : remoteHosts) {
+                    boolean existing = false;
+                    for (String hdfsServer:hdfsServers) {
+                        if (hdfsServer.equals(remoteHost))
+                            existing = true;
+                    }
+                    if (!existing) {
+                        if (!hdfsServersString.isEmpty())
+                            hdfsServersString = hdfsServersString + ",";
+                        hdfsServersString = hdfsServersString + remoteHost;
+                    }
+                }
+
+                if (!hdfsServersString.isEmpty())
+                    
pigContext.getProperties().setProperty("mapreduce.job.hdfs-servers", 
hdfsServersString);
                 fileNameMap.put(constructFileNameSignature(filename, 
funcSpec), absolutePath);
             }
             lo = new LOLoad(lp, new OperatorKey(scope, getNextId()), new 
FileSpec(absolutePath, funcSpec),

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestParser.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestParser.java?rev=944391&r1=944390&r2=944391&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestParser.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestParser.java Fri May 14 
18:54:28 2010
@@ -22,6 +22,7 @@ import static org.apache.pig.ExecType.LO
 import static org.apache.pig.ExecType.MAPREDUCE;
 
 import java.io.IOException;
+import java.util.Properties;
 
 import junit.framework.TestCase;
 
@@ -86,4 +87,36 @@ protected final Log log = LogFactory.get
         } catch (IOException io) {
         }
     }
+    
+    @Test
+    public void testRemoteServerList() throws ExecException, IOException {
+        try {
+            Properties pigProperties = 
pigServer.getPigContext().getProperties();
+            pigProperties.setProperty("fs.default.name", "hdfs://a.com:8020");
+            
+            pigServer.registerQuery("a = load '/user/pig/1.txt';");
+            
assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")==null);
+            
+            pigServer.registerQuery("a = load 'hdfs://a.com/user/pig/1.txt';");
+            
assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")==null);
+            
+            pigServer.registerQuery("a = load 'har:///1.txt';");
+            
assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")==null);
+            
+            pigServer.registerQuery("a = load 'hdfs://b.com/user/pig/1.txt';");
+            
assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")!=null &&
+                    
pigProperties.getProperty("mapreduce.job.hdfs-servers").contains("hdfs://b.com"));
+            
+            pigServer.registerQuery("a = load 
'har://hdfs-c.com/user/pig/1.txt';");
+            
assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")!=null &&
+                    
pigProperties.getProperty("mapreduce.job.hdfs-servers").contains("hdfs://c.com"));
+            
+            pigServer.registerQuery("a = load 
'hdfs://d.com:8020/user/pig/1.txt';");
+            
assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")!=null &&
+                    
pigProperties.getProperty("mapreduce.job.hdfs-servers").contains("hdfs://d.com:8020"));
+
+
+        } catch (IOException io) {
+        }
+    }
 }


Reply via email to