Author: rding
Date: Mon Jul 12 17:26:16 2010
New Revision: 963368

URL: http://svn.apache.org/viewvc?rev=963368&view=rev
Log:
PIG-1490: Make Pig storers work with remote HDFS in secure mode

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/StoreFunc.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    hadoop/pig/trunk/test/org/apache/pig/test/TestParser.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=963368&r1=963367&r2=963368&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Jul 12 17:26:16 2010
@@ -98,6 +98,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1490: Make Pig storers work with remote HDFS in secure mode (rding)
+
 PIG-1469: DefaultDataBag assumes ArrayList as default List type (azaroth via 
dvryaboy)
 
 PIG-1467: order by fail when set "fs.file.impl.disable.cache" to true (daijy)

Modified: hadoop/pig/trunk/src/org/apache/pig/StoreFunc.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/StoreFunc.java?rev=963368&r1=963367&r2=963368&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/StoreFunc.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/StoreFunc.java Mon Jul 12 17:26:16 2010
@@ -166,9 +166,9 @@ public abstract class StoreFunc implemen
      * @throws IOException
      */
     public static void cleanupOnFailureImpl(String location, Job job) 
-    throws IOException {
-        FileSystem fs = FileSystem.get(job.getConfiguration());
+    throws IOException {        
         Path path = new Path(location);
+        FileSystem fs = path.getFileSystem(job.getConfiguration());
         if(fs.exists(path)){
             fs.delete(path, true);
         }    

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=963368&r1=963367&r2=963368&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 Mon Jul 12 17:26:16 2010
@@ -485,9 +485,9 @@ public class MapReduceLauncher extends L
     }
     
     private void createSuccessFile(Job job, POStore store) throws IOException {
-        if(shouldMarkOutputDir(job)) {
-            FileSystem fs = FileSystem.get(job.getJobConf());
+        if(shouldMarkOutputDir(job)) {            
             Path outputPath = new Path(store.getSFile().getFileName());
+            FileSystem fs = outputPath.getFileSystem(job.getJobConf());
             if(fs.exists(outputPath)){
                 // create a file in the folder to mark it
                 Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=963368&r1=963367&r2=963368&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
(original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
Mon Jul 12 17:26:16 2010
@@ -504,6 +504,41 @@ public class QueryParser {
             return result;
         }
 
+    void setHdfsServers(String absolutePath, PigContext pigContext) throws 
URISyntaxException {
+        // Get native host
+        String defaultFS = 
(String)pigContext.getProperties().get("fs.default.name");
+        URI defaultFSURI = new URI(defaultFS);
+        String defaultHost = defaultFSURI.getHost();
+        if (defaultHost == null) defaultHost = "";
+                
+        defaultHost = defaultHost.toLowerCase();
+    
+        Set<String> remoteHosts = getRemoteHosts(absolutePath, defaultHost);
+                    
+        String hdfsServersString = 
(String)pigContext.getProperties().get("mapreduce.job.hdfs-servers");
+        if (hdfsServersString == null) hdfsServersString = "";
+        String hdfsServers[] = hdfsServersString.split(",");
+                    
+        for (String remoteHost : remoteHosts) {
+            boolean existing = false;
+            for (String hdfsServer : hdfsServers) {
+                if (hdfsServer.equals(remoteHost)) {
+                    existing = true;
+                }
+            }
+            if (!existing) {
+                if (!hdfsServersString.isEmpty()) {
+                    hdfsServersString = hdfsServersString + ",";
+                }
+                hdfsServersString = hdfsServersString + remoteHost;
+            }
+        }
+    
+        if (!hdfsServersString.isEmpty()) {
+            
pigContext.getProperties().setProperty("mapreduce.job.hdfs-servers", 
hdfsServersString);
+        }
+    }
+    
      // Check and set files to be automatically shipped for the given 
StreamingCommand
      // Auto-shipping rules:
      // 1. If the command begins with either perl or python assume that the 
@@ -1409,36 +1444,8 @@ LogicalOperator LoadClause(LogicalPlan l
                 absolutePath = loFunc.relativeToAbsolutePath(filename, 
getCurrentDir(pigContext));
                 
                 if (absolutePath!=null) {
-                       // Get native host
-                       String defaultFS = 
(String)pigContext.getProperties().get("fs.default.name");
-                       URI defaultFSURI = new URI(defaultFS);
-                       String defaultHost = defaultFSURI.getHost();
-                       if (defaultHost==null)
-                           defaultHost="";
-                       defaultHost = defaultHost.toLowerCase();
-       
-                       Set<String> remoteHosts = getRemoteHosts(absolutePath, 
defaultHost);
-                       
-                       String hdfsServersString = 
(String)pigContext.getProperties().get("mapreduce.job.hdfs-servers");
-                       if (hdfsServersString==null) hdfsServersString="";
-                       String hdfsServers[] = hdfsServersString.split(",");
-                       
-                       for (String remoteHost : remoteHosts) {
-                           boolean existing = false;
-                           for (String hdfsServer:hdfsServers) {
-                               if (hdfsServer.equals(remoteHost))
-                                   existing = true;
-                           }
-                           if (!existing) {
-                               if (!hdfsServersString.isEmpty())
-                                   hdfsServersString = hdfsServersString + ",";
-                               hdfsServersString = hdfsServersString + 
remoteHost;
-                           }
-                       }
-       
-                       if (!hdfsServersString.isEmpty())
-                           
pigContext.getProperties().setProperty("mapreduce.job.hdfs-servers", 
hdfsServersString);
-                   }
+                    setHdfsServers(absolutePath, pigContext);
+                }
                 fileNameMap.put(constructFileNameSignature(filename, 
funcSpec), absolutePath);
             }
             lo = new LOLoad(lp, new OperatorKey(scope, getNextId()), new 
FileSpec(absolutePath, funcSpec),
@@ -2578,6 +2585,9 @@ LogicalOperator StoreClause(LogicalPlan 
         String absolutePath = 
fileNameMap.get(constructFileNameSignature(fileName, funcSpec));
         if (absolutePath == null) {
             absolutePath = stoFunc.relToAbsPathForStoreLocation(fileName, 
getCurrentDir(pigContext));
+            if (absolutePath != null) {
+                setHdfsServers(absolutePath, pigContext);
+            }
             fileNameMap.put(constructFileNameSignature(fileName, funcSpec), 
absolutePath);   
         }
         LogicalOperator store = new LOStore(lp, new OperatorKey(scope, 
getNextId()),

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestParser.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestParser.java?rev=963368&r1=963367&r2=963368&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestParser.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestParser.java Mon Jul 12 
17:26:16 2010
@@ -119,4 +119,37 @@ protected final Log log = LogFactory.get
         } catch (IOException io) {
         }
     }
+    
+    @Test
+    public void testRemoteServerList2() throws ExecException, IOException {
+        try {
+            Properties pigProperties = 
pigServer.getPigContext().getProperties();
+
+            pigServer.setBatchOn();
+            
+            pigServer.registerQuery("a = load '/user/pig/1.txt';");
+            pigServer.registerQuery("store a into '/user/pig/1.txt';");
+            
+            System.out.println("hdfs-servers: " + 
pigProperties.getProperty("mapreduce.job.hdfs-servers"));
+            
assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")==null);
+                       
+            pigServer.registerQuery("store a into 
'hdfs://b.com/user/pig/1.txt';");
+            System.out.println("hdfs-servers: " + 
pigProperties.getProperty("mapreduce.job.hdfs-servers"));
+            
assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")!=null &&
+                    
pigProperties.getProperty("mapreduce.job.hdfs-servers").contains("hdfs://b.com"));
+                        
+            pigServer.registerQuery("store a into 
'har://hdfs-c.com:8020/user/pig/1.txt';");
+            System.out.println("hdfs-servers: " + 
pigProperties.getProperty("mapreduce.job.hdfs-servers"));
+            
assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")!=null &&
+                    
pigProperties.getProperty("mapreduce.job.hdfs-servers").contains("hdfs://c.com:8020"));
+                        
+            pigServer.registerQuery("store a into 
'hdfs://d.com:8020/user/pig/1.txt';");
+            System.out.println("hdfs-servers: " + 
pigProperties.getProperty("mapreduce.job.hdfs-servers"));
+            
assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")!=null &&
+                    
pigProperties.getProperty("mapreduce.job.hdfs-servers").contains("hdfs://d.com:8020"));
+
+        } catch (IOException io) {
+            fail(io.getMessage());
+        }
+    }
 }


Reply via email to