Author: szita
Date: Mon May 29 15:00:39 2017
New Revision: 1796639

URL: http://svn.apache.org/viewvc?rev=1796639&view=rev
Log:
PIG-4059: Pig On Spark

Added:
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemoverUtil.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POBroadcastSpark.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/KryoSerializer.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecType.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLocalExecType.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPOUserFuncVisitor.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigContext.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigRecordReader.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/BroadcastConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IteratorTransform.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RDDConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SplitConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/UnionConverter.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POGlobalRearrangeSpark.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POSampleSortSpark.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/ParallelismSetter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SparkSecondaryKeyOptimizerUtil.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompilerException.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/XMLSparkPrinter.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/running/
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/streaming/
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/streaming/SparkExecutableManager.java
    pig/trunk/src/org/apache/pig/tools/pigstats/PigWarnCounter.java
    pig/trunk/src/org/apache/pig/tools/pigstats/spark/
    pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java
    pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java
    pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java
    pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
    
pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStatusReporter.java
    pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkScriptState.java
    pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
    pig/trunk/test/e2e/pig/conf/spark.conf
    pig/trunk/test/excluded-tests-spark
    pig/trunk/test/org/apache/pig/spark/
    pig/trunk/test/org/apache/pig/spark/TestIndexedKey.java
    pig/trunk/test/org/apache/pig/spark/TestSecondarySortSpark.java
    pig/trunk/test/org/apache/pig/spark/TestSparkCompiler.java
    pig/trunk/test/org/apache/pig/test/SparkMiniCluster.java
    pig/trunk/test/org/apache/pig/test/YarnMiniCluster.java
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/
    
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-dot.gld
    
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld
    
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-xml.gld
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/bin/pig
    pig/trunk/build.xml
    pig/trunk/ivy.xml
    pig/trunk/ivy/libraries.properties
    pig/trunk/src/META-INF/services/org.apache.pig.ExecType
    pig/trunk/src/docs/src/documentation/content/xdocs/start.xml
    pig/trunk/src/org/apache/pig/PigConfiguration.java
    pig/trunk/src/org/apache/pig/PigWarning.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/AccumulatorOptimizer.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
    pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
    pig/trunk/src/org/apache/pig/data/SelfSpillBag.java
    pig/trunk/src/org/apache/pig/impl/PigContext.java
    pig/trunk/src/org/apache/pig/impl/builtin/StreamingUDF.java
    pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java
    pig/trunk/src/org/apache/pig/impl/util/UDFContext.java
    pig/trunk/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java
    pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFunc.java
    pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
    pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
    pig/trunk/test/e2e/pig/build.xml
    pig/trunk/test/e2e/pig/drivers/TestDriverPig.pm
    pig/trunk/test/e2e/pig/tests/bigdata.conf
    pig/trunk/test/e2e/pig/tests/cmdline.conf
    pig/trunk/test/e2e/pig/tests/grunt.conf
    pig/trunk/test/e2e/pig/tests/hcat.conf
    pig/trunk/test/e2e/pig/tests/multiquery.conf
    pig/trunk/test/e2e/pig/tests/negative.conf
    pig/trunk/test/e2e/pig/tests/nightly.conf
    pig/trunk/test/e2e/pig/tests/orc.conf
    pig/trunk/test/e2e/pig/tests/streaming.conf
    pig/trunk/test/e2e/pig/tests/turing_jython.conf
    pig/trunk/test/e2e/pig/tools/test/floatpostprocessor.pl
    pig/trunk/test/excluded-tests-mr
    pig/trunk/test/excluded-tests-tez
    
pig/trunk/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java
    pig/trunk/test/org/apache/pig/pigunit/PigTest.java
    pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java
    pig/trunk/test/org/apache/pig/test/TestAssert.java
    pig/trunk/test/org/apache/pig/test/TestCase.java
    pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java
    pig/trunk/test/org/apache/pig/test/TestCombiner.java
    pig/trunk/test/org/apache/pig/test/TestCubeOperator.java
    pig/trunk/test/org/apache/pig/test/TestEmptyInputDir.java
    pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java
    pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
    pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java
    pig/trunk/test/org/apache/pig/test/TestFinish.java
    pig/trunk/test/org/apache/pig/test/TestFlatten.java
    pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
    pig/trunk/test/org/apache/pig/test/TestGrunt.java
    pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
    pig/trunk/test/org/apache/pig/test/TestLimitVariable.java
    pig/trunk/test/org/apache/pig/test/TestLineageFindRelVisitor.java
    pig/trunk/test/org/apache/pig/test/TestMapSideCogroup.java
    pig/trunk/test/org/apache/pig/test/TestMergeJoinOuter.java
    pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
    pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
    pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java
    pig/trunk/test/org/apache/pig/test/TestNullConstant.java
    pig/trunk/test/org/apache/pig/test/TestPigRunner.java
    pig/trunk/test/org/apache/pig/test/TestPigServer.java
    pig/trunk/test/org/apache/pig/test/TestPigServerLocal.java
    pig/trunk/test/org/apache/pig/test/TestProjectRange.java
    pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
    pig/trunk/test/org/apache/pig/test/TestRank1.java
    pig/trunk/test/org/apache/pig/test/TestRank2.java
    pig/trunk/test/org/apache/pig/test/TestRank3.java
    pig/trunk/test/org/apache/pig/test/TestSecondarySort.java
    pig/trunk/test/org/apache/pig/test/TestStoreBase.java
    pig/trunk/test/org/apache/pig/test/TezMiniCluster.java
    pig/trunk/test/org/apache/pig/test/Util.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon May 29 15:00:39 2017
@@ -36,6 +36,8 @@ PIG-5067: Revisit union on numeric type
  
 IMPROVEMENTS
 
+PIG-4059: Pig On Spark
+
 PIG-5188: Review pig-index.xml (szita)
 
 PIG-4924: Translate failures.maxpercent MR setting to Tez Tez (rohini)

Modified: pig/trunk/bin/pig
URL: 
http://svn.apache.org/viewvc/pig/trunk/bin/pig?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/bin/pig (original)
+++ pig/trunk/bin/pig Mon May 29 15:00:39 2017
@@ -57,6 +57,21 @@ remaining=()
 includeHCatalog="";
 addJarString=-Dpig.additional.jars.uris\=;
 additionalJars="";
+prevArgExecType=false;
+isSparkMode=false;
+isSparkLocalMode=false;
+
+#verify the execType is SPARK or SPARK_LOCAL or not
+function processExecType(){
+    execType=$1
+    execTypeUpperCase=$(echo $execType |tr [a-z] [A-Z])
+    if [[ "$execTypeUpperCase" == "SPARK" ]]; then
+       isSparkMode=true
+    elif [[ "$execTypeUpperCase" == "SPARK_LOCAL" ]]; then
+       isSparkLocalMode=true
+    fi
+}
+
 # filter command line parameter
 for f in "$@"; do
      if [[ $f == "-secretDebugCmd" || $f == "-printCmdDebug" ]]; then
@@ -70,6 +85,13 @@ for f in "$@"; do
         includeHCatalog=true;
       elif [[ "$includeHCatalog" == "true" && $f == $addJarString* ]]; then
         additionalJars=`echo $f | sed s/$addJarString//`
+      elif [[ "$f" == "-x" || "$f" == "-exectype" ]]; then
+        prevArgExecType=true;
+        remaining[${#remaining[@]}]="$f"
+      elif [[ "$prevArgExecType" == "true" ]]; then
+        prevArgExecType=false;
+        processExecType $f
+        remaining[${#remaining[@]}]="$f"
       else
         remaining[${#remaining[@]}]="$f"
      fi
@@ -362,6 +384,44 @@ if [ "$includeHCatalog" == "true" ]; the
   PIG_OPTS="$PIG_OPTS -Dpig.additional.jars.uris=$ADDITIONAL_CLASSPATHS"
 fi
 
+################# ADDING SPARK DEPENDENCIES ##################
+# For spark_local mode:
+if [ "$isSparkLocalMode" == "true" ]; then
+#SPARK_MASTER is forced to be "local" in spark_local mode
+        SPARK_MASTER="local"
+    for f in $PIG_HOME/lib/spark/*.jar; do
+            CLASSPATH=${CLASSPATH}:$f;
+    done
+fi
+
+# For spark mode:
+# Please specify SPARK_HOME first so that we can locate 
$SPARK_HOME/lib/spark-assembly*.jar,
+# we will add spark-assembly*.jar to the classpath.
+if [ "$isSparkMode"  == "true" ]; then
+    if [ -z "$SPARK_HOME" ]; then
+       echo "Error: SPARK_HOME is not set!"
+       exit 1
+    fi
+
+    # Please specify SPARK_JAR which is the hdfs path of spark-assembly*.jar 
to allow YARN to cache spark-assembly*.jar on nodes so that it doesn't need to 
be distributed each time an application runs.
+    if [ -z "$SPARK_JAR" ]; then
+       echo "Error: SPARK_JAR is not set, SPARK_JAR stands for the hdfs 
location of spark-assembly*.jar. This allows YARN to cache spark-assembly*.jar 
on nodes so that it doesn't need to be distributed each time an application 
runs."
+       exit 1
+    fi
+
+    if [ -n "$SPARK_HOME" ]; then
+        echo "Using Spark Home: " ${SPARK_HOME}
+        SPARK_ASSEMBLY_JAR=`ls ${SPARK_HOME}/lib/spark-assembly*`
+        CLASSPATH=${CLASSPATH}:$SPARK_ASSEMBLY_JAR
+    fi
+fi
+
+#spark-assembly.jar contains jcl-over-slf4j which would create a LogFactory 
implementation that is incompatible
+if [ "$isSparkMode"  == "true" ]; then
+    PIG_OPTS="$PIG_OPTS 
-Dorg.apache.commons.logging.LogFactory=org.apache.commons.logging.impl.LogFactoryImpl"
+fi
+################# ADDING SPARK DEPENDENCIES ##################
+
 # run it
 if [ -n "$HADOOP_BIN" ]; then
     if [ "$debug" == "true" ]; then

Modified: pig/trunk/build.xml
URL: 
http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/build.xml (original)
+++ pig/trunk/build.xml Mon May 29 15:00:39 2017
@@ -46,6 +46,7 @@
 
     <!-- source properties -->
     <property name="lib.dir" value="${basedir}/lib" />
+    <property name="spark.lib.dir" value="${basedir}/lib/spark" />
     <property name="src.dir" value="${basedir}/src" />
     <property name="python.src.dir" value="${src.dir}/python" />
     <property name="src.lib.dir" value="${basedir}/lib-src" />
@@ -106,9 +107,12 @@
     <property name="test.unit.file" value="${test.src.dir}/unit-tests"/>
     <property name="test.smoke.file" value="${test.src.dir}/smoke-tests"/>
     <property name="test.all.file" value="${test.src.dir}/all-tests"/>
+    <property name="test.spark.file" value="${test.src.dir}/spark-tests"/>
+    <property name="test.spark_local.file" 
value="${test.src.dir}/spark-local-tests"/>
     <property name="test.exclude.file" value="${test.src.dir}/excluded-tests"/>
     <property name="test.exclude.file.mr" 
value="${test.src.dir}/excluded-tests-mr"/>
     <property name="test.exclude.file.tez" 
value="${test.src.dir}/excluded-tests-tez"/>
+    <property name="test.exclude.file.spark" 
value="${test.src.dir}/excluded-tests-spark"/>
     <property name="pigunit.jarfile" value="pigunit.jar" />
     <property name="piggybank.jarfile" 
value="${basedir}/contrib/piggybank/java/piggybank.jar" />
     <property name="smoke.tests.jarfile" 
value="${build.dir}/${final.name}-smoketests.jar" />
@@ -160,6 +164,10 @@
         <propertyreset name="test.exec.type" value="tez" />
     </target>
 
+    <target name="setSparkEnv">
+      <propertyreset name="test.exec.type" value="spark" />
+    </target>
+
     <target name="setWindowsPath" if="${isWindows}">
       <property name="build.path" value="${env.Path};${hadoop.root}\bin" />
     </target>
@@ -253,6 +261,7 @@
     <property name="build.ivy.dir" location="${build.dir}/ivy" />
     <property name="build.ivy.lib.dir" location="${build.ivy.dir}/lib" />
     <property name="ivy.lib.dir" 
location="${build.ivy.lib.dir}/${ant.project.name}"/>
+    <property name="ivy.lib.dir.spark" location="${ivy.lib.dir}/spark" />
     <property name="build.ivy.report.dir" location="${build.ivy.dir}/report" />
     <property name="build.ivy.maven.dir" location="${build.ivy.dir}/maven" />
     <property name="pom.xml" location="${build.ivy.maven.dir}/pom.xml"/>
@@ -322,6 +331,9 @@
             <fileset dir="${ivy.lib.dir}">
                 <include name="**.*jar"/>
             </fileset>
+            <fileset dir="${ivy.lib.dir.spark}">
+                <include name="**.*jar"/>
+            </fileset>
         </path>
         <taskdef name="eclipse"
                  classname="prantl.ant.eclipse.EclipseTask"
@@ -352,6 +364,7 @@
     <path id="classpath">
         <fileset file="${ivy.lib.dir}/${zookeeper.jarfile}"/>
         <fileset dir="${ivy.lib.dir}" includes="*.jar"/>
+        <fileset dir="${ivy.lib.dir.spark}" includes="*.jar"/>
     </path>
 
     <!-- javadoc-classpath -->
@@ -370,6 +383,7 @@
 
     <fileset dir="${ivy.lib.dir}" id="core.dependencies.jar">
         <exclude name="**.*jar"/>
+        <exclude name="spark/**.*jar"/>
     </fileset>
 
     <fileset dir="${ivy.lib.dir}" id="runtime.dependencies-withouthadoop.jar">
@@ -678,6 +692,7 @@
         <buildJar svnString="${svn.revision}" 
outputFile="${output.jarfile.core}" includedJars="core.dependencies.jar"/>
         <buildJar svnString="${svn.revision}" 
outputFile="${output.jarfile.withouthadoop}" 
includedJars="runtime.dependencies-withouthadoop.jar"/>
         <antcall target="copyCommonDependencies"/>
+        <antcall target="copySparkDependencies"/>
         <antcall target="copyh2Dependencies"/>
         <antcall target="copyHadoop2LocalRuntimeDependencies" />
     </target>
@@ -715,9 +730,17 @@
             <fileset dir="${ivy.lib.dir}" 
includes="httpdlog-*-${basjes-httpdlog-pigloader.version}.jar"/>
             <fileset dir="${ivy.lib.dir}" 
includes="parser-core-${basjes-httpdlog-pigloader.version}.jar"/>
             <fileset dir="${ivy.lib.dir}" includes="ivy-*.jar"/>
+            <fileset dir="${ivy.lib.dir}" includes="commons-logging-*.jar"/>
         </copy>
     </target>
 
+    <target name="copySparkDependencies">
+        <mkdir dir="${spark.lib.dir}" />
+        <copy todir="${spark.lib.dir}">
+            <fileset dir="${ivy.lib.dir.spark}" includes="*.jar"/>
+        </copy>
+    </target>
+    
     <target name="copyh2Dependencies" if="isHadoop2">
         <mkdir dir="${lib.dir}/h2" />
         <copy todir="${lib.dir}/h2">
@@ -856,7 +879,12 @@
         <macro-test-runner test.file="${test.all.file}" 
tests.failed="test-tez.failed"/>
         <fail if="test-tez.failed">Tests failed!</fail>
     </target>
-       
+
+    <target name="test-spark" 
depends="setSparkEnv,setWindowsPath,setLinuxPath,compile-test,jar,debugger.check,jackson-pig-3039-test-download"
 description="Run Spark unit tests in Spark cluster-local mode">
+        <macro-test-runner test.file="${test.all.file}" 
tests.failed="test-spark.failed"/>
+        <fail if="test-spark.failed">Tests failed!</fail>
+    </target>
+
     <target name="debugger.check" depends="debugger.set,debugger.unset"/>
     <target name="debugger.set" if="debugPort">
         <property name="debugArgs" value="-Xdebug 
-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=${debugPort}"/>
@@ -881,9 +909,6 @@
             <sysproperty key="test.exec.type" value="${test.exec.type}" />
             <sysproperty key="ssh.gateway" value="${ssh.gateway}" />
             <sysproperty key="hod.server" value="${hod.server}" />
-            <sysproperty key="build.classes" value="${build.classes}" />
-            <sysproperty key="test.build.classes" 
value="${test.build.classes}" />
-            <sysproperty key="ivy.lib.dir" value="${ivy.lib.dir}" />
             <sysproperty key="java.io.tmpdir" value="${junit.tmp.dir}" />
             <sysproperty key="hadoop.log.dir" value="${test.log.dir}"/>
             <jvmarg line="-XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=128M 
${debugArgs} -Djava.library.path=${hadoop.root}\bin"/>
@@ -978,6 +1003,10 @@
         <ant dir="${test.e2e.dir}" target="test-tez"/>
     </target>
 
+    <target name="test-e2e-spark" depends="jar, piggybank" description="run 
end-to-end tests in spark mode">
+            <ant dir="${test.e2e.dir}" target="test-spark"/>
+    </target>
+
     <target name="test-e2e-deploy" depends="jar" description="deploy 
end-to-end tests to existing cluster">
         <ant dir="${test.e2e.dir}" target="deploy"/>
     </target>
@@ -1624,6 +1653,8 @@
      <target name="ivy-compile" depends="ivy-resolve" description="Retrieve 
Ivy-managed artifacts for compile configuration">
        <ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" 
log="${loglevel}"
                  
pattern="${build.ivy.lib.dir}/${ivy.artifact.retrieve.pattern}" conf="compile"/>
+       <ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" 
log="${loglevel}"
+                 
pattern="${ivy.lib.dir.spark}/[artifact]-[revision](-[classifier]).[ext]" 
conf="spark"/>
        <ivy:cachepath pathid="compile.classpath" conf="compile"/>
      </target>
 

Modified: pig/trunk/ivy.xml
URL: 
http://svn.apache.org/viewvc/pig/trunk/ivy.xml?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/ivy.xml (original)
+++ pig/trunk/ivy.xml Mon May 29 15:00:39 2017
@@ -40,6 +40,7 @@
     <conf name="buildJar" extends="compile,test" visibility="private"/>
     <conf name="hadoop2" visibility="private"/>
     <conf name="hbase1" visibility="private"/>
+    <conf name="spark" visibility="private" />
   </configurations>
   <publications>
     <artifact name="pig" conf="master"/>
@@ -406,6 +407,24 @@
 
     <dependency org="com.twitter" name="parquet-pig-bundle" 
rev="${parquet-pig-bundle.version}" conf="compile->master"/>
 
+    <!-- for Spark integration -->
+    <dependency org="org.apache.spark" name="spark-core_2.11" 
rev="${spark.version}" conf="spark->default">
+        <exclude org="org.eclipse.jetty.orbit" module="javax.servlet"/>
+        <exclude org="org.eclipse.jetty.orbit" module="javax.transaction"/>
+        <exclude org="org.eclipse.jetty.orbit" module="javax.mail.glassfish"/>
+        <exclude org="org.eclipse.jetty.orbit" module="javax.activation"/>
+        <exclude org="org.apache.hadoop" />
+        <exclude org="com.esotericsoftware.kryo" />
+        <exclude org="jline" module="jline"/>
+        <exclude org="com.google.guava" />
+    </dependency>
+    <dependency org="org.apache.spark" name="spark-yarn_2.11" 
rev="${spark.version}" conf="spark->default">
+        <exclude org="org.apache.hadoop" />
+    </dependency>
+    <dependency org="asm" name="asm" rev="${asm.version}" 
conf="compile->master"/>
+    <dependency org="javax.servlet" name="javax.servlet-api" rev="3.0.1" 
conf="spark->default"/>
+    <dependency org="org.scala-lang.modules" name="scala-xml_2.11" 
rev="${scala-xml.version}" conf="spark->default"/>
+
     <!-- for Tez integration -->
     <dependency org="org.apache.tez" name="tez" rev="${tez.version}"
        conf="hadoop2->master"/>

Modified: pig/trunk/ivy/libraries.properties
URL: 
http://svn.apache.org/viewvc/pig/trunk/ivy/libraries.properties?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/ivy/libraries.properties (original)
+++ pig/trunk/ivy/libraries.properties Mon May 29 15:00:39 2017
@@ -73,6 +73,7 @@ netty-all.version=4.0.23.Final
 rats-lib.version=0.5.1
 slf4j-api.version=1.6.1
 slf4j-log4j12.version=1.6.1
+spark.version=1.6.1
 xerces.version=2.10.0
 xalan.version=2.7.1
 wagon-http.version=1.0-beta-2
@@ -88,7 +89,7 @@ jsr311-api.version=1.1.1
 mockito.version=1.8.4
 jansi.version=1.9
 asm.version=3.3.1
-snappy-java.version=1.1.0.1
+snappy-java.version=1.1.1.3
 tez.version=0.7.0
 parquet-pig-bundle.version=1.2.3
 snappy.version=0.2
@@ -96,3 +97,4 @@ leveldbjni.version=1.8
 curator.version=2.6.0
 htrace.version=3.1.0-incubating
 commons-lang3.version=3.1
+scala-xml.version=1.0.5

Modified: pig/trunk/src/META-INF/services/org.apache.pig.ExecType
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/META-INF/services/org.apache.pig.ExecType?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/src/META-INF/services/org.apache.pig.ExecType (original)
+++ pig/trunk/src/META-INF/services/org.apache.pig.ExecType Mon May 29 15:00:39 
2017
@@ -15,4 +15,5 @@ org.apache.pig.backend.hadoop.executione
 org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRExecType
 org.apache.pig.backend.hadoop.executionengine.tez.TezLocalExecType
 org.apache.pig.backend.hadoop.executionengine.tez.TezExecType
-
+org.apache.pig.backend.hadoop.executionengine.spark.SparkExecType
+org.apache.pig.backend.hadoop.executionengine.spark.SparkLocalExecType

Modified: pig/trunk/src/docs/src/documentation/content/xdocs/start.xml
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/start.xml?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/start.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/start.xml Mon May 29 
15:00:39 2017
@@ -26,45 +26,45 @@
 
 <!-- SET UP PIG -->
  <section>
-               <title>Pig Setup</title>
-       
+        <title>Pig Setup</title>
+    
 <!-- ++++++++++++++++++++++++++++++++++ -->
  <section id="req">
  <title>Requirements</title>
  <p><strong>Mandatory</strong></p>
       <p>Unix and Windows users need the following:</p>
-               <ul>
-                 <li> <strong>Hadoop 2.X</strong> - <a 
href="http://hadoop.apache.org/common/releases.html";>http://hadoop.apache.org/common/releases.html</a>
 (You can run Pig with different versions of Hadoop by setting HADOOP_HOME to 
point to the directory where you have installed Hadoop. If you do not set 
HADOOP_HOME, by default Pig will run with the embedded version, currently 
Hadoop 2.7.3.)</li>
-                 <li> <strong>Java 1.7</strong> - <a 
href="http://java.sun.com/javase/downloads/index.jsp";>http://java.sun.com/javase/downloads/index.jsp</a>
 (set JAVA_HOME to the root of your Java installation)</li>       
-               </ul>
-               <p></p>
+        <ul>
+          <li> <strong>Hadoop 2.X</strong> - <a 
href="http://hadoop.apache.org/common/releases.html";>http://hadoop.apache.org/common/releases.html</a>
 (You can run Pig with different versions of Hadoop by setting HADOOP_HOME to 
point to the directory where you have installed Hadoop. If you do not set 
HADOOP_HOME, by default Pig will run with the embedded version, currently 
Hadoop 2.7.3.)</li>
+          <li> <strong>Java 1.7</strong> - <a 
href="http://java.sun.com/javase/downloads/index.jsp";>http://java.sun.com/javase/downloads/index.jsp</a>
 (set JAVA_HOME to the root of your Java installation)</li>    
+        </ul>
+        <p></p>
  <p><strong>Optional</strong></p>
-               <ul>
+         <ul>
           <li> <strong>Python 2.7</strong> - <a 
href="http://jython.org/downloads.html";>https://www.python.org</a> (when using 
Streaming Python UDFs) </li>
           <li> <strong>Ant 1.8</strong> - <a 
href="http://ant.apache.org/";>http://ant.apache.org/</a> (for builds) </li>
-               </ul>
+        </ul>
  
   </section>         
    
 <!-- ++++++++++++++++++++++++++++++++++ -->        
  <section id="download">
  <title>Download Pig</title>
-       <p>To get a Pig distribution, do the following:</p>
-       
-       <ol>
-       <li>Download a recent stable release from one of the Apache Download 
Mirrors 
-       (see <a href="http://hadoop.apache.org/pig/releases.html";> Pig 
Releases</a>).</li>
-       
+    <p>To get a Pig distribution, do the following:</p>
+    
+    <ol>
+    <li>Download a recent stable release from one of the Apache Download 
Mirrors 
+    (see <a href="http://hadoop.apache.org/pig/releases.html";> Pig 
Releases</a>).</li>
+    
     <li>Unpack the downloaded Pig distribution, and then note the following:
-           <ul>
-           <li>The Pig script file, pig, is located in the bin directory 
(/pig-n.n.n/bin/pig). 
-           The Pig environment variables are described in the Pig script 
file.</li>
-           <li>The Pig properties file, pig.properties, is located in the conf 
directory (/pig-n.n.n/conf/pig.properties). 
-           You can specify an alternate location using the PIG_CONF_DIR 
environment variable.</li>
-       </ul>   
-       </li>
-       <li>Add /pig-n.n.n/bin to your path. Use export (bash,sh,ksh) or setenv 
(tcsh,csh). For example: <br></br>
-       <code>$ export PATH=/&lt;my-path-to-pig&gt;/pig-n.n.n/bin:$PATH</code>
+        <ul>
+        <li>The Pig script file, pig, is located in the bin directory 
(/pig-n.n.n/bin/pig). 
+        The Pig environment variables are described in the Pig script 
file.</li>
+        <li>The Pig properties file, pig.properties, is located in the conf 
directory (/pig-n.n.n/conf/pig.properties). 
+        You can specify an alternate location using the PIG_CONF_DIR 
environment variable.</li>
+    </ul>    
+    </li>
+    <li>Add /pig-n.n.n/bin to your path. Use export (bash,sh,ksh) or setenv 
(tcsh,csh). For example: <br></br>
+    <code>$ export PATH=/&lt;my-path-to-pig&gt;/pig-n.n.n/bin:$PATH</code>
 </li>
 <li>
 Test the Pig installation with this simple command: <code>$ pig -help</code>
@@ -78,10 +78,10 @@ Test the Pig installation with this simp
 <title>Build Pig</title>
       <p>To build pig, do the following:</p>
      <ol>
-         <li> Check out the Pig code from SVN: <code>svn co 
http://svn.apache.org/repos/asf/pig/trunk</code> </li>
-         <li> Build the code from the top directory: <code>ant</code> <br></br>
-         If the build is successful, you should see the pig.jar file created 
in that directory. </li>  
-         <li> Validate the pig.jar  by running a unit test: <code>ant 
test</code></li>
+      <li> Check out the Pig code from SVN: <code>svn co 
http://svn.apache.org/repos/asf/pig/trunk</code> </li>
+      <li> Build the code from the top directory: <code>ant</code> <br></br>
+      If the build is successful, you should see the pig.jar file created in 
that directory. </li>    
+      <li> Validate the pig.jar  by running a unit test: <code>ant 
test</code></li>
      </ol>
  </section>
 </section>
@@ -90,46 +90,53 @@ Test the Pig installation with this simp
     
    <!-- RUNNING PIG  -->
    <section id="run">
-       <title>Running Pig </title> 
-       <p>You can run Pig (execute Pig Latin statements and Pig commands) 
using various modes.</p>
-       <table>
-       <tr>
-       <td></td>
+    <title>Running Pig </title>
+    <p>You can run Pig (execute Pig Latin statements and Pig commands) using 
various modes.</p>
+    <table>
+    <tr>
+    <td></td>
     <td><strong>Local Mode</strong></td>
     <td><strong>Tez Local Mode</strong></td>
+    <td><strong>Spark Local Mode</strong></td>
     <td><strong>Mapreduce Mode</strong></td>
     <td><strong>Tez Mode</strong></td>
-       </tr>
-       <tr>
-       <td><strong>Interactive Mode </strong></td>
+    <td><strong>Spark Mode</strong></td>
+    </tr>
+    <tr>
+    <td><strong>Interactive Mode </strong></td>
     <td>yes</td>
     <td>experimental</td>
     <td>yes</td>
     <td>yes</td>
-       </tr>
-       <tr>
-       <td><strong>Batch Mode</strong> </td>
+    </tr>
+    <tr>
+    <td><strong>Batch Mode</strong> </td>
     <td>yes</td>
     <td>experimental</td>
     <td>yes</td>
     <td>yes</td>
-       </tr>
-       </table>
-       
-       <!-- ++++++++++++++++++++++++++++++++++ -->
-          <section id="execution-modes">
-       <title>Execution Modes</title> 
-<p>Pig has two execution modes or exectypes: </p>
+    </tr>
+    </table>
+
+    <!-- ++++++++++++++++++++++++++++++++++ -->
+       <section id="execution-modes">
+    <title>Execution Modes</title>
+<p>Pig has six execution modes or exectypes: </p>
 <ul>
 <li><strong>Local Mode</strong> - To run Pig in local mode, you need access to 
a single machine; all files are installed and run using your local host and 
file system. Specify local mode using the -x flag (pig -x local).
 </li>
 <li><strong>Tez Local Mode</strong> - To run Pig in tez local mode. It is 
similar to local mode, except internally Pig will invoke tez runtime engine. 
Specify Tez local mode using the -x flag (pig -x tez_local).
 <p><strong>Note:</strong> Tez local mode is experimental. There are some 
queries which just error out on bigger data in local mode.</p>
 </li>
+<li><strong>Spark Local Mode</strong> - To run Pig in spark local mode. It is 
similar to local mode, except internally Pig will invoke spark runtime engine. 
Specify Spark local mode using the -x flag (pig -x spark_local).
+<p><strong>Note:</strong> Spark local mode is experimental. There are some 
queries which just error out on bigger data in local mode.</p>
+</li>
 <li><strong>Mapreduce Mode</strong> - To run Pig in mapreduce mode, you need 
access to a Hadoop cluster and HDFS installation. Mapreduce mode is the default 
mode; you can, <em>but don't need to</em>, specify it using the -x flag (pig OR 
pig -x mapreduce).
 </li>
 <li><strong>Tez Mode</strong> - To run Pig in Tez mode, you need access to a 
Hadoop cluster and HDFS installation. Specify Tez mode using the -x flag (-x 
tez).
 </li>
+<li><strong>Spark Mode</strong> - To run Pig in Spark mode, you need access to 
a Spark, Yarn or Mesos cluster and HDFS installation. Specify Spark mode using 
the -x flag (-x spark). In Spark execution mode, it is necessary to set 
env::SPARK_MASTER to an appropriate value (local - local mode, yarn-client - 
yarn-client mode, mesos://host:port - spark on mesos or spark://host:port - 
spark cluster. For more information refer to spark documentation on Master 
URLs, <em>yarn-cluster mode is currently not supported</em>). Pig scripts run 
on Spark can take advantage of the <a 
href="http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation";>dynamic
 allocation</a> feature. The feature can be enabled by simply enabling 
<em>spark.dynamicAllocation.enabled</em>. Refer to spark <a 
href="http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation";>configuration</a>
 for additional configuration details. In general all properties in the pig 
script prefixed with
  <em>spark.</em> are copied to the Spark Application Configuration. Please 
note that Yarn auxillary service need to be enabled on Spark for this to work. 
See Spark documentation for additional details.
+</li>
 </ul>
 <p></p>
 
@@ -148,6 +155,9 @@ $ pig -x local ...
 /* Tez local mode */
 $ pig -x tez_local ...
  
+/* Spark local mode */
+$ pig -x spark_local ...
+
 /* mapreduce mode */
 $ pig ...
 or
@@ -155,6 +165,9 @@ $ pig -x mapreduce ...
 
 /* Tez mode */
 $ pig -x tez ...
+
+/* Spark mode */
+$ pig -x spark ...
 </source>
 
 </section>
@@ -179,7 +192,7 @@ grunt&gt; dump B;
 <source>
 $ pig -x local
 ... - Connecting to ...
-grunt> 
+grunt>
 </source>
 
 <p><strong>Tez Local Mode</strong></p>
@@ -189,6 +202,13 @@ $ pig -x tez_local
 grunt> 
 </source>
 
+<p><strong>Spark Local Mode</strong></p>
+<source>
+$ pig -x spark_local
+... - Connecting to ...
+grunt> 
+</source>
+
 <p><strong>Mapreduce Mode</strong> </p>
 <source>
 $ pig -x mapreduce
@@ -208,6 +228,14 @@ $ pig -x tez
 ... - Connecting to ...
 grunt> 
 </source>
+
+<p><strong>Spark Mode</strong> </p>
+<source>
+$ pig -x spark
+... - Connecting to ...
+grunt>
+</source>
+
 </section>
 </section>
 
@@ -237,6 +265,10 @@ $ pig -x local id.pig
 <source>
 $ pig -x tez_local id.pig
 </source>
+<p><strong>Spark Local Mode</strong></p>
+<source>
+$ pig -x spark_local id.pig
+</source>
 <p><strong>Mapreduce Mode</strong> </p>
 <source>
 $ pig id.pig
@@ -247,22 +279,26 @@ $ pig -x mapreduce id.pig
 <source>
 $ pig -x tez id.pig
 </source>
+<p><strong>Spark Mode</strong> </p>
+<source>
+$ pig -x spark id.pig
+</source>
 </section>
 
   <!-- ==================================================================== -->
-    
+
    <!-- PIG SCRIPTS -->
    <section id="pig-scripts">
-       <title>Pig Scripts</title>
-       
-<p>Use Pig scripts to place Pig Latin statements and Pig commands in a single 
file. While not required, it is good practice to identify the file using the 
*.pig extension.</p>        
-       
+    <title>Pig Scripts</title>
+
+<p>Use Pig scripts to place Pig Latin statements and Pig commands in a single 
file. While not required, it is good practice to identify the file using the 
*.pig extension.</p>
+
 <p>You can run Pig scripts from the command line and from the Grunt shell
 (see the <a href="cmds.html#run">run</a> and <a href="cmds.html#exec">exec</a> 
commands). </p>
-       
+
 <p>Pig scripts allow you to pass values to parameters using <a 
href="cont.html#Parameter-Sub">parameter substitution</a>. </p>
 
-<!-- +++++++++++++++++++++++++++++++++++++++++++ -->   
+<!-- +++++++++++++++++++++++++++++++++++++++++++ -->    
    <p id="comments"><strong>Comments in Scripts</strong></p>
    
    <p>You can include comments in Pig scripts:</p>
@@ -284,8 +320,8 @@ A = LOAD 'student' USING PigStorage() AS
 B = FOREACH A GENERATE name;  -- transforming data
 DUMP B;  -- retrieving results
 </source>   
-       
-<!-- +++++++++++++++++++++++++++++++++++++++++++ -->           
+    
+<!-- +++++++++++++++++++++++++++++++++++++++++++ -->        
 
 <p id="dfs"><strong>Scripts and Distributed File Systems</strong></p>
 
@@ -293,7 +329,7 @@ DUMP B;  -- retrieving results
 <source>
 $ pig hdfs://nn.mydomain.com:9020/myscripts/script.pig
 </source> 
-</section>     
+</section>    
 </section>
 </section>
 
@@ -355,7 +391,7 @@ hadoop.security.krb5.keytab=/home/niels/
     
    <!-- PIG LATIN STATEMENTS -->
    <section id="pl-statements">
-       <title>Pig Latin Statements</title>     
+    <title>Pig Latin Statements</title>    
    <p>Pig Latin statements are the basic constructs you use to process data 
using Pig. 
    A Pig Latin statement is an operator that takes a <a 
href="basic.html#relations">relation</a> as input and produces another relation 
as output. 
    (This definition applies to all Pig Latin operators except LOAD and STORE 
which read data from and write data to the file system.) 
@@ -496,20 +532,20 @@ However, in a production environment you
 <p></p>
 <p id="pig-properties">To specify Pig properties use one of these 
mechanisms:</p>
 <ul>
-       <li>The pig.properties file (add the directory that contains the 
pig.properties file to the classpath)</li>
-       <li>The -D and a Pig property in PIG_OPTS environment variable (export 
PIG_OPTS=-Dpig.tmpfilecompression=true)</li>
-       <li>The -P command line option and a properties file (pig -P 
mypig.properties)</li>
-       <li>The <a href="cmds.html#set">set</a> command (set 
pig.exec.nocombiner true)</li>
+    <li>The pig.properties file (add the directory that contains the 
pig.properties file to the classpath)</li>
+    <li>The -D and a Pig property in PIG_OPTS environment variable (export 
PIG_OPTS=-Dpig.tmpfilecompression=true)</li>
+    <li>The -P command line option and a properties file (pig -P 
mypig.properties)</li>
+    <li>The <a href="cmds.html#set">set</a> command (set pig.exec.nocombiner 
true)</li>
 </ul>
 <p><strong>Note:</strong> The properties file uses standard Java property file 
format.</p>
 <p>The following precedence order is supported: pig.properties &lt; -D Pig 
property &lt; -P properties file &lt; set command. This means that if the same 
property is provided using the –D command line option as well as the –P 
command line option (properties file), the value of the property in the 
properties file will take precedence.</p>
 
 <p id="hadoop-properties">To specify Hadoop properties you can use the same 
mechanisms:</p>
 <ul>
-       <li>Hadoop configuration files (include 
pig-cluster-hadoop-site.xml)</li>
-       <li>The -D and a Hadoop property in PIG_OPTS environment variable 
(export PIG_OPTS=–Dmapreduce.task.profile=true) </li>
-       <li>The -P command line option and a property file (pig -P 
property_file)</li>
-       <li>The <a href="cmds.html#set">set</a> command (set 
mapred.map.tasks.speculative.execution false)</li>
+    <li>Hadoop configuration files (include pig-cluster-hadoop-site.xml)</li>
+    <li>The -D and a Hadoop property in PIG_OPTS environment variable (export 
PIG_OPTS=–Dmapreduce.task.profile=true) </li>
+    <li>The -P command line option and a property file (pig -P 
property_file)</li>
+    <li>The <a href="cmds.html#set">set</a> command (set 
mapred.map.tasks.speculative.execution false)</li>
 </ul>
 <p></p>
 <p>The same precedence holds: Hadoop configuration files &lt; -D Hadoop 
property &lt; -P properties_file &lt; set command.</p>
@@ -523,7 +559,7 @@ However, in a production environment you
   <section id="tutorial">
 <title>Pig Tutorial </title>
 
-<p>The Pig tutorial shows you how to run Pig scripts using Pig's local mode, 
mapreduce mode and Tez mode (see <a href="#execution-modes">Execution 
Modes</a>).</p>
+<p>The Pig tutorial shows you how to run Pig scripts using Pig's local mode, 
mapreduce mode, Tez mode and Spark mode (see <a 
href="#execution-modes">Execution Modes</a>).</p>
 
 <p>To get started, do the following preliminary tasks:</p>
 
@@ -541,8 +577,8 @@ $ export PIG_HOME=/&lt;my-path-to-pig&gt
 <li>Create the pigtutorial.tar.gz file:
 <ul>
     <li>Move to the Pig tutorial directory (.../pig-0.16.0/tutorial).</li>
-       <li>Run the "ant" command from the tutorial directory. This will create 
the pigtutorial.tar.gz file.
-       </li>
+    <li>Run the "ant" command from the tutorial directory. This will create 
the pigtutorial.tar.gz file.
+    </li>
 </ul>
 
 </li>
@@ -574,6 +610,10 @@ Or if you are using Tez local mode:
 <source>
 $ pig -x tez_local script1-local.pig
 </source>
+Or if you are using Spark local mode:
+<source>
+$ pig -x spark_local script1-local.pig
+</source>
 </li>
 <li>Review the result files, located in the script1-local-results.txt 
directory.
 <p>The output may contain a few Hadoop warnings which can be ignored:</p>
@@ -587,7 +627,7 @@ $ pig -x tez_local script1-local.pig
 
  <!-- ++++++++++++++++++++++++++++++++++ --> 
 <section>
-<title> Running the Pig Scripts in Mapreduce Mode or Tez Mode</title>
+<title> Running the Pig Scripts in Mapreduce Mode, Tez Mode or Spark 
Mode</title>
 
 <p>To run the Pig scripts in mapreduce mode, do the following: </p>
 <ol>
@@ -606,6 +646,8 @@ export PIG_CLASSPATH=/mycluster/conf
 <source>
 export PIG_CLASSPATH=/mycluster/conf:/tez/conf
 </source>
+<p>If you are using Spark, you will also need to specify SPARK_HOME and 
specify SPARK_JAR which is the hdfs location where you uploaded 
$SPARK_HOME/lib/spark-assembly*.jar:</p>
+<source>export SPARK_HOME=/mysparkhome/; export 
SPARK_JAR=hdfs://example.com:8020/spark-assembly*.jar</source>
 <p><strong>Note:</strong> The PIG_CLASSPATH can also be used to add any other 
3rd party dependencies or resource files a pig script may require. If there is 
also a need to make the added entries take the highest precedence in the Pig 
JVM's classpath order, one may also set the env-var PIG_USER_CLASSPATH_FIRST to 
any value, such as 'true' (and unset the env-var to disable).</p></li>
 <li>Set the HADOOP_CONF_DIR environment variable to the location of the 
cluster configuration directory:
 <source>
@@ -620,6 +662,10 @@ Or if you are using Tez:
 <source>
 $ pig -x tez script1-hadoop.pig
 </source>
+Or if you are using Spark:
+<source>
+$ pig -x spark script1-hadoop.pig
+</source>
 </li>
 
 <li>Review the result files, located in the script1-hadoop-results or 
script2-hadoop-results HDFS directory:

Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Mon May 29 15:00:39 2017
@@ -482,6 +482,11 @@ public class PigConfiguration {
     public static final String PIG_LOG_TRACE_ID = "pig.log.trace.id";
 
     /**
+     * Use Netty file server for Pig on Spark, true or false, default value is 
false
+     */
+    public static final String PIG_SPARK_USE_NETTY_FILESERVER = 
"pig.spark.rpc.useNettyFileServer";
+
+    /**
      * @deprecated use {@link #PIG_LOG_TRACE_ID} instead. Will be removed in 
Pig 0.18
      */
     public static final String CALLER_ID = PIG_LOG_TRACE_ID;

Modified: pig/trunk/src/org/apache/pig/PigWarning.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigWarning.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigWarning.java (original)
+++ pig/trunk/src/org/apache/pig/PigWarning.java Mon May 29 15:00:39 2017
@@ -68,6 +68,8 @@ public enum PigWarning {
     DELETE_FAILED,
     PROJECTION_INVALID_RANGE,
     NO_LOAD_FUNCTION_FOR_CASTING_BYTEARRAY,
-    SKIP_UDF_CALL_FOR_NULL
+    SKIP_UDF_CALL_FOR_NULL,
+    SPARK_WARN, //bulk collection of warnings under Spark exec engine
+    SPARK_CUSTOM_WARN // same as above but for custom UDF warnings only, see 
PIG-2207
     ;
 }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
 Mon May 29 15:00:39 2017
@@ -35,6 +35,6 @@ public class AccumulatorOptimizer extend
     }
 
     public void visitMROp(MapReduceOper mr) throws VisitorException {
-        AccumulatorOptimizerUtil.addAccumulator(mr.reducePlan);
+        AccumulatorOptimizerUtil.addAccumulator(mr.reducePlan, 
mr.reducePlan.getRoots());
     }
 }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java
 Mon May 29 15:00:39 2017
@@ -28,7 +28,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
-import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.Pair;
@@ -69,7 +68,7 @@ class NoopFilterRemover extends MROpPlan
         public void visit() throws VisitorException {
             super.visit();
             for (Pair<POFilter, PhysicalPlan> pair: removalQ) {
-                removeFilter(pair.first, pair.second);
+                NoopFilterRemoverUtil.removeFilter(pair.first, pair.second);
             }
             removalQ.clear();
         }
@@ -91,23 +90,5 @@ class NoopFilterRemover extends MROpPlan
                 }
             }
         }
-        
-        private void removeFilter(POFilter filter, PhysicalPlan plan) {
-            if (plan.size() > 1) {
-                try {
-                    List<PhysicalOperator> fInputs = filter.getInputs();
-                    List<PhysicalOperator> sucs = plan.getSuccessors(filter);
-
-                    plan.removeAndReconnect(filter);
-                    if(sucs!=null && sucs.size()!=0){
-                        for (PhysicalOperator suc : sucs) {
-                            suc.setInputs(fInputs);
-                        }
-                    }
-                } catch (PlanException pe) {
-                    log.info("Couldn't remove a filter in optimizer: 
"+pe.getMessage());
-                }
-            }
-        }
     }
 }

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemoverUtil.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemoverUtil.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemoverUtil.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemoverUtil.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.impl.plan.PlanException;
+
+import java.util.List;
+
+public class NoopFilterRemoverUtil {
+    private static Log log = LogFactory.getLog(NoopFilterRemoverUtil.class);
+
+    public static void removeFilter(POFilter filter, PhysicalPlan plan) {
+        if (plan.size() > 1) {
+            try {
+                List<PhysicalOperator> fInputs = filter.getInputs();
+                List<PhysicalOperator> sucs = plan.getSuccessors(filter);
+
+                plan.removeAndReconnect(filter);
+                if(sucs!=null && sucs.size()!=0){
+                    for (PhysicalOperator suc : sucs) {
+                        suc.setInputs(fInputs);
+                    }
+                }
+            } catch (PlanException pe) {
+                log.info("Couldn't remove a filter in optimizer: 
"+pe.getMessage());
+            }
+        }
+    }
+}

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
 Mon May 29 15:00:39 2017
@@ -23,7 +23,7 @@ import org.apache.pig.EvalFunc;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.StoreFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
-import org.apache.pig.tools.pigstats.PigStatusReporter;
+import org.apache.pig.tools.pigstats.PigWarnCounter;
 
 /**
  *
@@ -36,7 +36,7 @@ public final class PigHadoopLogger imple
     private static Log log = LogFactory.getLog(PigHadoopLogger.class);
     private static PigHadoopLogger logger = null;
 
-    private PigStatusReporter reporter = null;
+    private PigWarnCounter reporter = null;
     private boolean aggregate = false;
 
     private PigHadoopLogger() {
@@ -52,7 +52,7 @@ public final class PigHadoopLogger imple
         return logger;
     }
 
-    public void setReporter(PigStatusReporter reporter) {
+    public void setReporter(PigWarnCounter reporter) {
         this.reporter = reporter;
     }
 
@@ -65,10 +65,10 @@ public final class PigHadoopLogger imple
         if (getAggregate()) {
             if (reporter != null) {
                 if (o instanceof EvalFunc || o instanceof LoadFunc || o 
instanceof StoreFunc) {
-                    reporter.incrCounter(className, warningEnum.name(), 1);
+                    reporter.incrWarnCounter(className, warningEnum.name(), 
1L);
                 }
                 // For backwards compatibility, always report with 
warningEnum, see PIG-3739
-                reporter.incrCounter(warningEnum, 1);
+                reporter.incrWarnCounter(warningEnum, 1L);
             } else {
                 //TODO:
                 //in local mode of execution if the PigHadoopLogger is used 
initially,

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
 Mon May 29 15:00:39 2017
@@ -61,14 +61,6 @@ public class PigInputFormat extends Inpu
     public static final String PIG_INPUT_SIGNATURES = "pig.inpSignatures";
     public static final String PIG_INPUT_LIMITS = "pig.inpLimits";
 
-    /**
-     * @deprecated Use {@link UDFContext} instead in the following way to get
-     * the job's {@link Configuration}:
-     * <pre>UdfContext.getUdfContext().getJobConf()</pre>
-     */
-    @Deprecated
-    public static Configuration sJob;
-
     /* (non-Javadoc)
      * @see 
org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache.hadoop.mapreduce.InputSplit,
 org.apache.hadoop.mapreduce.TaskAttemptContext)
      */
@@ -78,43 +70,66 @@ public class PigInputFormat extends Inpu
             org.apache.hadoop.mapreduce.InputSplit split,
             TaskAttemptContext context) throws IOException,
             InterruptedException {
-        // We need to create a TaskAttemptContext based on the Configuration 
which
-        // was used in the getSplits() to produce the split supplied here. For
-        // this, let's find out the input of the script which produced the 
split
-        // supplied here and then get the corresponding Configuration and setup
-        // TaskAttemptContext based on it and then call the real InputFormat's
-        // createRecordReader() method
-
-        PigSplit pigSplit = (PigSplit)split;
-        activeSplit = pigSplit;
-        // XXX hadoop 20 new API integration: get around a hadoop 20 bug by
-        // passing total # of splits to each split so it can be retrieved
-        // here and set it to the configuration object. This number is needed
-        // by PoissonSampleLoader to compute the number of samples
-        int n = pigSplit.getTotalSplits();
-        context.getConfiguration().setInt("pig.mapsplits.count", n);
-        Configuration conf = context.getConfiguration();
-        PigContext.setPackageImportList((ArrayList<String>) ObjectSerializer
-                .deserialize(conf.get("udf.import.list")));
-        MapRedUtil.setupUDFContext(conf);
-        LoadFunc loadFunc = getLoadFunc(pigSplit.getInputIndex(), conf);
-        // Pass loader signature to LoadFunc and to InputFormat through
-        // the conf
-        passLoadSignature(loadFunc, pigSplit.getInputIndex(), conf);
-
-        // merge entries from split specific conf into the conf we got
-        PigInputFormat.mergeSplitSpecificConf(loadFunc, pigSplit, conf);
-
-        // for backward compatibility
-        PigInputFormat.sJob = conf;
+        RecordReaderFactory factory = new RecordReaderFactory(split, context);
+        return factory.createRecordReader();
+    }
 
-        InputFormat inputFormat = loadFunc.getInputFormat();
 
-        List<Long> inpLimitLists =
-                (ArrayList<Long>)ObjectSerializer.deserialize(
-                        conf.get(PIG_INPUT_LIMITS));
+    /**
+     * Helper class to create record reader
+     */
+    protected static class RecordReaderFactory {
+        protected InputFormat inputFormat;
+        protected PigSplit pigSplit;
+        protected LoadFunc loadFunc;
+        protected TaskAttemptContext context;
+        protected long limit;
+
+        public RecordReaderFactory(org.apache.hadoop.mapreduce.InputSplit 
split,
+                                   TaskAttemptContext context) throws 
IOException {
+
+            // We need to create a TaskAttemptContext based on the 
Configuration which
+            // was used in the getSplits() to produce the split supplied here. 
For
+            // this, let's find out the input of the script which produced the 
split
+            // supplied here and then get the corresponding Configuration and 
setup
+            // TaskAttemptContext based on it and then call the real 
InputFormat's
+            // createRecordReader() method
+
+            PigSplit pigSplit = (PigSplit)split;
+            // XXX hadoop 20 new API integration: get around a hadoop 20 bug by
+            // passing total # of splits to each split so it can be retrieved
+            // here and set it to the configuration object. This number is 
needed
+            // by PoissonSampleLoader to compute the number of samples
+            int n = pigSplit.getTotalSplits();
+            context.getConfiguration().setInt("pig.mapsplits.count", n);
+            Configuration conf = context.getConfiguration();
+            PigContext.setPackageImportList((ArrayList<String>) 
ObjectSerializer
+                    .deserialize(conf.get("udf.import.list")));
+            MapRedUtil.setupUDFContext(conf);
+            LoadFunc loadFunc = getLoadFunc(pigSplit.getInputIndex(), conf);
+            // Pass loader signature to LoadFunc and to InputFormat through
+            // the conf
+            passLoadSignature(loadFunc, pigSplit.getInputIndex(), conf);
+
+            // merge entries from split specific conf into the conf we got
+            PigInputFormat.mergeSplitSpecificConf(loadFunc, pigSplit, conf);
+
+            InputFormat inputFormat = loadFunc.getInputFormat();
+
+            List<Long> inpLimitLists =
+                    (ArrayList<Long>)ObjectSerializer.deserialize(
+                            conf.get(PIG_INPUT_LIMITS));
+
+            this.inputFormat = inputFormat;
+            this.pigSplit = pigSplit;
+            this.loadFunc = loadFunc;
+            this.context = context;
+            this.limit = inpLimitLists.get(pigSplit.getInputIndex());
+        }
 
-        return new PigRecordReader(inputFormat, pigSplit, loadFunc, context, 
inpLimitLists.get(pigSplit.getInputIndex()));
+        public org.apache.hadoop.mapreduce.RecordReader<Text, Tuple> 
createRecordReader() throws IOException, InterruptedException {
+            return new PigRecordReader(inputFormat, pigSplit, loadFunc, 
context, limit);
+        }
     }
 
 
@@ -339,10 +354,4 @@ public class PigInputFormat extends Inpu
         return pigSplit;
     }
 
-    public static PigSplit getActiveSplit() {
-        return activeSplit;
-    }
-
-    private static PigSplit activeSplit;
-
 }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
 Mon May 29 15:00:39 2017
@@ -48,6 +48,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapred.SplitLocationInfo;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -125,6 +126,12 @@ public class PigSplit extends InputSplit
      */
     String[] locations = null;
 
+
+    /**
+     * overall splitLocationInfos
+     */
+    SplitLocationInfo[] splitLocationInfos = null;
+
     // this seems necessary for Hadoop to instatiate this split on the
     // backend
     public PigSplit() {}
@@ -201,6 +208,51 @@ public class PigSplit extends InputSplit
         return locations;
     }
 
+
+    @Override
+    public SplitLocationInfo[] getLocationInfo() throws IOException {
+        if (splitLocationInfos == null) {
+            HashMap<SplitLocationInfo, Long> locMap = new 
HashMap<SplitLocationInfo, Long>();
+            Long lenInMap;
+            for (InputSplit split : wrappedSplits) {
+                SplitLocationInfo[] locs = split.getLocationInfo();
+                if( locs != null) {
+                    for (SplitLocationInfo loc : locs) {
+                        try {
+                            if ((lenInMap = locMap.get(loc)) == null)
+                                locMap.put(loc, split.getLength());
+                            else
+                                locMap.put(loc, lenInMap + split.getLength());
+                        } catch (InterruptedException e) {
+                            throw new IOException("InputSplit.getLength throws 
exception: ", e);
+                        }
+                    }
+                }
+            }
+            Set<Map.Entry<SplitLocationInfo, Long>> entrySet = 
locMap.entrySet();
+            Map.Entry<SplitLocationInfo, Long>[] hostSize =
+                    entrySet.toArray(new Map.Entry[entrySet.size()]);
+            Arrays.sort(hostSize, new Comparator<Map.Entry<SplitLocationInfo, 
Long>>() {
+
+                @Override
+                public int compare(Entry<SplitLocationInfo, Long> o1, 
Entry<SplitLocationInfo, Long> o2) {
+                    long diff = o1.getValue() - o2.getValue();
+                    if (diff < 0) return 1;
+                    if (diff > 0) return -1;
+                    return 0;
+                }
+            });
+            // maximum 5 locations are in list: refer to PIG-1648 for more 
details
+            int nHost = Math.min(hostSize.length, 5);
+            splitLocationInfos = new SplitLocationInfo[nHost];
+            for (int i = 0; i < nHost; ++i) {
+                splitLocationInfos[i] = hostSize[i].getKey();
+            }
+        }
+        return splitLocationInfos;
+    }
+
+
     @Override
     public long getLength() throws IOException, InterruptedException {
         if (length == -1) {

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java
 Mon May 29 15:00:39 2017
@@ -52,7 +52,8 @@ public class SecondaryKeyOptimizerMR ext
         if (mr.getCustomPartitioner()!=null)
             return;
 
-        info = SecondaryKeyOptimizerUtil.applySecondaryKeySort(mr.mapPlan, 
mr.reducePlan);
+        SecondaryKeyOptimizerUtil secondaryKeyOptUtil = new 
SecondaryKeyOptimizerUtil();
+        info = secondaryKeyOptUtil.applySecondaryKeySort(mr.mapPlan, 
mr.reducePlan);
         if (info != null && info.isUseSecondaryKey()) {
             mr.setUseSecondaryKey(true);
             mr.setSecondarySortOrder(info.getSecondarySortOrder());

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
 Mon May 29 15:00:39 2017
@@ -497,6 +497,10 @@ public abstract class PhysicalOperator e
        parentPlan = physicalPlan;
     }
 
+    public PhysicalPlan getParentPlan() {
+        return parentPlan;
+    }
+
     public Log getLogger() {
         return log;
     }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
 Mon May 29 15:00:39 2017
@@ -120,6 +120,10 @@ public class POUserFunc extends Expressi
         instantiateFunc(funcSpec);
     }
 
+    public void setFuncInputSchema(){
+        setFuncInputSchema(signature);
+    }
+
     private void instantiateFunc(FuncSpec fSpec) {
         this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(fSpec);
         this.setSignature(signature);

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
 Mon May 29 15:00:39 2017
@@ -73,6 +73,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark;
 import org.apache.pig.impl.plan.PlanVisitor;
 import org.apache.pig.impl.plan.PlanWalker;
 import org.apache.pig.impl.plan.VisitorException;
@@ -363,4 +364,8 @@ public class PhyPlanVisitor extends Plan
 
     public void visitPoissonSample(POPoissonSample poissonSample) throws 
VisitorException {
     }
+
+    public void visitBroadcastSpark(POBroadcastSpark poBroadcastSpark) {
+    }
+
 }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
 Mon May 29 15:00:39 2017
@@ -143,13 +143,6 @@ public class PhysicalPlan extends Operat
         to.setInputs(getPredecessors(to));
     }
 
-    /*public void connect(List<PhysicalOperator> from, PhysicalOperator to) 
throws IOException{
-        if(!to.supportsMultipleInputs()){
-            throw new IOException("Invalid Operation on " + to.name() + ". It 
doesn't support multiple inputs.");
-        }
-
-    }*/
-
     @Override
     public void remove(PhysicalOperator op) {
         op.setInputs(null);

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POBroadcastSpark.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POBroadcastSpark.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POBroadcastSpark.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POBroadcastSpark.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class POBroadcastSpark extends PhysicalOperator {
+    private static final long serialVersionUID = 1L;
+
+    protected String broadcastedVariableName;
+
+    public POBroadcastSpark(OperatorKey k) {
+        super(k);
+    }
+
+    public POBroadcastSpark(POBroadcastSpark copy)
+            throws ExecException {
+        super(copy);
+    }
+
+    /**
+     * Set your broadcast variable name so that
+     * BroadcastConverter can put this broadcasted variable in a map
+     * which can be referenced by other functions / closures in Converters
+     *
+     * @param varName
+     */
+    public void setBroadcastedVariableName(String varName) {
+        broadcastedVariableName = varName;
+    }
+
+    public String getBroadcastedVariableName() {
+        return broadcastedVariableName;
+    }
+
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        return null;
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+
+    @Override
+    public String name() {
+        return getAliasString() + "BroadcastSpark - " + mKey.toString();
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visitBroadcastSpark(this);
+    }
+}

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
 Mon May 29 15:00:39 2017
@@ -70,6 +70,15 @@ public class POCollectedGroup extends Ph
 
     private transient boolean useDefaultBag;
 
+    //For Spark
+    private transient boolean endOfInput = false;
+    public boolean isEndOfInput() {
+        return endOfInput;
+    }
+    public void setEndOfInput (boolean isEndOfInput) {
+        endOfInput = isEndOfInput;
+    }
+
     public POCollectedGroup(OperatorKey k) {
         this(k, -1, null);
     }
@@ -132,7 +141,7 @@ public class POCollectedGroup extends Ph
             if (inp.returnStatus == POStatus.STATUS_EOP) {
                 // Since the output is buffered, we need to flush the last
                 // set of records when the close method is called by mapper.
-                if (this.parentPlan.endOfAllInput) {
+                if (this.parentPlan.endOfAllInput || isEndOfInput()) {
                     return getStreamCloseResult();
                 } else {
                     break;
@@ -257,13 +266,13 @@ public class POCollectedGroup extends Ph
             leafOps.add(leaf);
         }
    }
-    
+
     private void setIllustratorEquivalenceClasses(Tuple tin) {
         if (illustrator != null) {
           illustrator.getEquivalenceClasses().get(0).add(tin);
         }
     }
-    
+
     @Override
     public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
         return null;

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
 Mon May 29 15:00:39 2017
@@ -519,6 +519,10 @@ public class POFRJoin extends PhysicalOp
         return LRs;
     }
 
+    public boolean isLeftOuterJoin() {
+        return isLeftOuterJoin;
+    }
+
     public int getFragment() {
         return fragment;
     }

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.SchemaTupleBackend;
+import org.apache.pig.data.SchemaTupleClassGenerator;
+import org.apache.pig.data.SchemaTupleFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import java.util.List;
+import java.util.Map;
+
+public class POFRJoinSpark extends POFRJoin {
+    private static final Log log = LogFactory.getLog(POFRJoinSpark.class);
+
+    private Map<String, List<Tuple>> broadcasts;
+
+    public POFRJoinSpark(POFRJoin copy) throws ExecException {
+        super(copy);
+    }
+
+    @Override
+    protected void setUpHashMap() throws ExecException {
+        log.info("Building replication hash table");
+
+        SchemaTupleFactory[] inputSchemaTupleFactories = new 
SchemaTupleFactory[inputSchemas.length];
+        SchemaTupleFactory[] keySchemaTupleFactories = new 
SchemaTupleFactory[inputSchemas.length];
+        for (int i = 0; i < inputSchemas.length; i++) {
+            addSchemaToFactories(inputSchemas[i], inputSchemaTupleFactories, 
i);
+            addSchemaToFactories(keySchemas[i], keySchemaTupleFactories, i);
+        }
+
+        replicates.set(fragment, null);
+        int i = -1;
+        long start = System.currentTimeMillis();
+        for (int k = 0; k < inputSchemas.length; ++k) {
+            ++i;
+
+            SchemaTupleFactory inputSchemaTupleFactory = 
inputSchemaTupleFactories[i];
+            SchemaTupleFactory keySchemaTupleFactory = 
keySchemaTupleFactories[i];
+
+            if (i == fragment) {
+                replicates.set(i, null);
+                continue;
+            }
+
+            TupleToMapKey replicate = new TupleToMapKey(1000, 
keySchemaTupleFactory);
+
+            log.debug("Completed setup. Trying to build replication hash 
table");
+            List<Tuple> tuples = 
broadcasts.get(parentPlan.getPredecessors(this).get(i).getOperatorKey().toString());
+
+            POLocalRearrange localRearrange = LRs[i];
+
+            for (Tuple t : tuples) {
+                localRearrange.attachInput(t);
+                Result res = localRearrange.getNextTuple();
+                if (getReporter() != null) {
+                    getReporter().progress();
+                }
+                Tuple tuple = (Tuple) res.result;
+                if (isKeyNull(tuple.get(1))) continue;
+                Object key = tuple.get(1);
+                Tuple value = getValueTuple(localRearrange, tuple);
+
+                if (replicate.get(key) == null) {
+                    replicate.put(key, new 
POMergeJoin.TuplesToSchemaTupleList(1, inputSchemaTupleFactory));
+                }
+
+                replicate.get(key).add(value);
+
+            }
+            replicates.set(i, replicate);
+        }
+        long end = System.currentTimeMillis();
+        log.debug("Hash Table built. Time taken: " + (end - start));
+    }
+
+    @Override
+    public String name() {
+        return getAliasString() + "FRJoinSpark[" + 
DataType.findTypeName(resultType)
+                + "]" + " - " + mKey.toString();
+    }
+
+    private void addSchemaToFactories(Schema schema, SchemaTupleFactory[] 
schemaTupleFactories, int index) {
+        if (schema != null) {
+            log.debug("Using SchemaTuple for FR Join Schema: " + schema);
+            schemaTupleFactories[index] = 
SchemaTupleBackend.newSchemaTupleFactory(schema, false, 
SchemaTupleClassGenerator.GenContext.FR_JOIN);
+        }
+    }
+
+    public void attachInputs(Map<String, List<Tuple>> broadcasts) {
+        this.broadcasts = broadcasts;
+    }
+}

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
 Mon May 29 15:00:39 2017
@@ -828,6 +828,10 @@ public class POForEach extends PhysicalO
         }
     }
 
+    public PhysicalOperator[] getPlanLeafOps() {
+        return planLeafOps;
+    }
+
     public void setMapSideOnly(boolean mapSideOnly) {
         this.mapSideOnly = mapSideOnly;
     }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java
 Mon May 29 15:00:39 2017
@@ -74,7 +74,13 @@ public class POGlobalRearrange extends P
     public POGlobalRearrange(OperatorKey k, int rp, List inp) {
         super(k, rp, inp);
     }
-    
+
+    public POGlobalRearrange(POGlobalRearrange copy) throws ExecException {
+        super(copy);
+        this.cross = copy.cross;
+        this.customPartitioner = copy.customPartitioner;
+    }
+
     @Override
     public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitGlobalRearrange(this);


Reply via email to