Author: zly
Date: Fri May  5 08:39:42 2017
New Revision: 1793981

URL: http://svn.apache.org/viewvc?rev=1793981&view=rev
Log:
PIG-5215:Merge changes from review board to spark branch(Liyun)

Modified:
    pig/branches/spark/ivy/libraries.properties
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java

Modified: pig/branches/spark/ivy/libraries.properties
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/ivy/libraries.properties?rev=1793981&r1=1793980&r2=1793981&view=diff
==============================================================================
--- pig/branches/spark/ivy/libraries.properties (original)
+++ pig/branches/spark/ivy/libraries.properties Fri May  5 08:39:42 2017
@@ -17,7 +17,6 @@
 accumulo15.version=1.5.0
 apacheant.version=1.7.1
 apacherat.version=0.8
-asm.version=3.2
 automaton.version=1.11-8
 avro.version=1.7.5
 basjes-httpdlog-pigloader.version=2.4

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=1793981&r1=1793980&r2=1793981&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
 Fri May  5 08:39:42 2017
@@ -224,7 +224,7 @@ public class PigSplit extends InputSplit
                             else
                                 locMap.put(loc, lenInMap + split.getLength());
                         } catch (InterruptedException e) {
-                            throw new RuntimeException("InputSplit.getLength 
throws exception: ", e);
+                            throw new IOException("InputSplit.getLength throws 
exception: ", e);
                         }
                     }
                 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java?rev=1793981&r1=1793980&r2=1793981&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
 Fri May  5 08:39:42 2017
@@ -44,7 +44,12 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.FRJoinConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter;

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1793981&r1=1793980&r2=1793981&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Fri May  5 08:39:42 2017
@@ -389,15 +389,20 @@ public class SparkLauncher extends Launc
             tmpFolder.deleteOnExit();
             for (String file : cacheFiles.split(",")) {
                 String fileName = extractFileName(file.trim());
-                Path src = new Path(extractFileUrl(file.trim()));
-                File tmpFile = new File(tmpFolder, fileName);
-                Path tmpFilePath = new Path(tmpFile.getAbsolutePath());
-                FileSystem fs = tmpFilePath.getFileSystem(jobConf);
-                fs.copyToLocalFile(src, tmpFilePath);
-                tmpFile.deleteOnExit();
-                LOG.info(String.format("CacheFile:%s", fileName));
-                addResourceToSparkJobWorkingDirectory(tmpFile, fileName,
-                        ResourceType.FILE);
+                if( fileName != null) {
+                    String fileUrl = extractFileUrl(file.trim());
+                    if( fileUrl != null) {
+                        Path src = new Path(fileUrl);
+                        File tmpFile = new File(tmpFolder, fileName);
+                        Path tmpFilePath = new Path(tmpFile.getAbsolutePath());
+                        FileSystem fs = tmpFilePath.getFileSystem(jobConf);
+                        fs.copyToLocalFile(src, tmpFilePath);
+                        tmpFile.deleteOnExit();
+                        LOG.info(String.format("CacheFile:%s", fileName));
+                        addResourceToSparkJobWorkingDirectory(tmpFile, 
fileName,
+                                ResourceType.FILE);
+                    }
+                }
             }
         }
     }
@@ -484,26 +489,14 @@ public class SparkLauncher extends Launc
         String[] tmpAry = cacheFileUrl.split("#");
         String fileName = tmpAry != null && tmpAry.length == 2 ? tmpAry[1]
                 : null;
-        if (fileName == null) {
-            throw new RuntimeException("cache file is invalid format, file:"
-                    + cacheFileUrl);
-        } else {
-            LOG.debug("Cache file name is valid:" + cacheFileUrl);
-            return fileName;
-        }
+        return fileName;
     }
 
     private String extractFileUrl(String cacheFileUrl) {
         String[] tmpAry = cacheFileUrl.split("#");
         String fileName = tmpAry != null && tmpAry.length == 2 ? tmpAry[0]
                 : null;
-        if (fileName == null) {
-            throw new RuntimeException("cache file is invalid format, file:"
-                    + cacheFileUrl);
-        } else {
-            LOG.debug("Cache file name is valid:" + cacheFileUrl);
-            return fileName;
-        }
+        return fileName;
     }
 
     public SparkOperPlan compile(PhysicalPlan physicalPlan,

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1793981&r1=1793980&r2=1793981&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
 Fri May  5 08:39:42 2017
@@ -138,17 +138,12 @@ public class LoadConverter implements RD
                 SparkUtil.getManifest(Tuple.class));
     }
 
-    private void registerUdfFiles() {
+    private void registerUdfFiles() throws MalformedURLException{
         Map<String, File> scriptFiles = pigContext.getScriptFiles();
         for (Map.Entry<String, File> scriptFile : scriptFiles.entrySet()) {
-            try {
-                File script = scriptFile.getValue();
-                if (script.exists()) {
-                    
sparkContext.addFile(script.toURI().toURL().toExternalForm());
-                }
-            } catch (MalformedURLException e) {
-                String msg = "Problem while registering UDF jars and files in 
LoadConverter.";
-                throw new RuntimeException(msg, e);
+            File script = scriptFile.getValue();
+            if (script.exists()) {
+                sparkContext.addFile(script.toURI().toURL().toExternalForm());
             }
         }
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java?rev=1793981&r1=1793980&r2=1793981&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java
 Fri May  5 08:39:42 2017
@@ -32,8 +32,6 @@ import org.apache.pig.impl.plan.VisitorE
 
 public class POPoissonSampleSpark extends POPoissonSample {
     private static final Log LOG = 
LogFactory.getLog(POPoissonSampleSpark.class);
-    //TODO verify can be removed?
-    //private static final long serialVersionUID = 1L;
     // Only for Spark
     private transient boolean endOfInput = false;
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java?rev=1793981&r1=1793980&r2=1793981&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java
 Fri May  5 08:39:42 2017
@@ -84,12 +84,12 @@ public class SecondaryKeyOptimizerSpark
             try {
                 mapPlan = getMapPlan(sparkOperator.physicalPlan, currentLR);
             } catch (PlanException e) {
-                throw new RuntimeException(e);
+                throw new VisitorException(e);
             }
             try {
                 reducePlan = getReducePlan(sparkOperator.physicalPlan, 
currentLR);
             } catch (PlanException e) {
-                throw new RuntimeException(e);
+                throw new VisitorException(e);
             }
 
             // Current code does not enable secondarykey optimization when 
join case is encounted

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1793981&r1=1793980&r2=1793981&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
 Fri May  5 08:39:42 2017
@@ -1352,11 +1352,7 @@ public class SparkCompiler extends PhyPl
         // if transform plans are not specified, project the columns of 
sorting keys
         if (transformPlans == null) {
             Pair<POProject, Byte>[] sortProjs = null;
-            try {
-                sortProjs = getSortCols(sort.getSortPlans());
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
+            sortProjs = getSortCols(sort.getSortPlans());
             // Set up the projections of the key columns
             if (sortProjs == null) {
                 PhysicalPlan ep = new PhysicalPlan();

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java?rev=1793981&r1=1793980&r2=1793981&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java
 Fri May  5 08:39:42 2017
@@ -28,19 +28,19 @@ import org.apache.pig.impl.plan.VisitorE
  */
 public class SparkOperPlan extends OperatorPlan<SparkOperator> {
 
-       @Override
-       public String toString() {
-               ByteArrayOutputStream baos = new ByteArrayOutputStream();
-               PrintStream ps = new PrintStream(baos);
-               SparkPrinter printer = new SparkPrinter(ps, this);
-               printer.setVerbose(true);
-               try {
-                       printer.visit();
-               } catch (VisitorException e) {
-                       // TODO Auto-generated catch block
-                       throw new RuntimeException(
-                                       "Unable to get String representation of 
plan:" + e, e);
-               }
-               return baos.toString();
-       }
+    @Override
+    public String toString() {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(baos);
+        SparkPrinter printer = new SparkPrinter(ps, this);
+        printer.setVerbose(true);
+        try {
+            printer.visit();
+        } catch (VisitorException e) {
+            // TODO Auto-generated catch block
+            throw new RuntimeException(
+                    "Unable to get String representation of plan:" + e, e);
+        }
+        return baos.toString();
+    }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java?rev=1793981&r1=1793980&r2=1793981&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
 Fri May  5 08:39:42 2017
@@ -107,7 +107,7 @@ public class SparkPOPackageAnnotator ext
 
             if (pkg.getPkgr() instanceof LitePackager) {
                 if (lrearrange.getIndex() != 0) {
-                    throw new RuntimeException(
+                    throw new VisitorException(
                             "POLocalRearrange for POPackageLite cannot have 
index other than 0, but has index - "
                                     + lrearrange.getIndex());
                 }


Reply via email to