Author: zly
Date: Mon Mar 20 21:59:36 2017
New Revision: 1787852

URL: http://svn.apache.org/viewvc?rev=1787852&view=rev
Log:
PIG-5187:UdfDistributedCache_1 is failing with spark exec type (Nandor via 
Liyun)

Added:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPOUserFuncVisitor.java
Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1787852&r1=1787851&r2=1787852&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Mon Mar 20 21:59:36 2017
@@ -232,7 +232,7 @@ public class SparkLauncher extends Launc
     }
 
     private void uploadResources(SparkOperPlan sparkPlan) throws IOException {
-        addFilesToSparkJob();
+        addFilesToSparkJob(sparkPlan);
         addJarsToSparkJob(sparkPlan);
     }
 
@@ -350,7 +350,7 @@ public class SparkLauncher extends Launc
         }
     }
 
-    private void addFilesToSparkJob() throws IOException {
+    private void addFilesToSparkJob(SparkOperPlan sparkPlan) throws 
IOException {
         LOG.info("add files Spark Job");
         String shipFiles = pigContext.getProperties().getProperty(
                 "pig.streaming.ship.files");
@@ -358,12 +358,22 @@ public class SparkLauncher extends Launc
         String cacheFiles = pigContext.getProperties().getProperty(
                 "pig.streaming.cache.files");
         cacheFiles(cacheFiles);
+        addUdfResourcesToSparkJob(sparkPlan);
     }
 
+    private void addUdfResourcesToSparkJob(SparkOperPlan sparkPlan) throws 
IOException {
+        SparkPOUserFuncVisitor sparkPOUserFuncVisitor = new 
SparkPOUserFuncVisitor(sparkPlan);
+        sparkPOUserFuncVisitor.visit();
+        Joiner joiner = Joiner.on(",");
+        String shipFiles = joiner.join(sparkPOUserFuncVisitor.getShipFiles());
+        shipFiles(shipFiles);
+        String cacheFiles = 
joiner.join(sparkPOUserFuncVisitor.getCacheFiles());
+        cacheFiles(cacheFiles);
+    }
 
     private void shipFiles(String shipFiles)
             throws IOException {
-        if (shipFiles != null) {
+        if (shipFiles != null && !shipFiles.isEmpty()) {
             for (String file : shipFiles.split(",")) {
                 File shipFile = new File(file.trim());
                 if (shipFile.exists()) {
@@ -376,7 +386,7 @@ public class SparkLauncher extends Launc
     }
 
     private void cacheFiles(String cacheFiles) throws IOException {
-        if (cacheFiles != null) {
+        if (cacheFiles != null && !cacheFiles.isEmpty()) {
             File tmpFolder = Files.createTempDirectory("cache").toFile();
             tmpFolder.deleteOnExit();
             for (String file : cacheFiles.split(",")) {

Added: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPOUserFuncVisitor.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPOUserFuncVisitor.java?rev=1787852&view=auto
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPOUserFuncVisitor.java
 (added)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPOUserFuncVisitor.java
 Mon Mar 20 21:59:36 2017
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.UdfCacheShipFilesVisitor;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class SparkPOUserFuncVisitor extends SparkOpPlanVisitor {
+    private Set<String> cacheFiles = new HashSet<>();
+    private Set<String> shipFiles = new HashSet<>();
+
+    public SparkPOUserFuncVisitor(SparkOperPlan plan) {
+        super(plan, new DepthFirstWalker<>(plan));
+    }
+
+    @Override
+    public void visitSparkOp(SparkOperator sparkOperator) throws 
VisitorException {
+        if(!sparkOperator.physicalPlan.isEmpty()) {
+            UdfCacheShipFilesVisitor udfCacheFileVisitor = new 
UdfCacheShipFilesVisitor(sparkOperator.physicalPlan);
+            udfCacheFileVisitor.visit();
+            cacheFiles.addAll(udfCacheFileVisitor.getCacheFiles());
+            shipFiles.addAll(udfCacheFileVisitor.getShipFiles());
+        }
+    }
+
+    public Set<String> getCacheFiles() {
+        return cacheFiles;
+    }
+
+    public Set<String> getShipFiles() {
+        return shipFiles;
+    }
+}


Reply via email to