Author: zly Date: Wed Mar 1 08:56:54 2017 New Revision: 1784882 URL: http://svn.apache.org/viewvc?rev=1784882&view=rev Log: PIG-5154:Fix GFCross related issues after merging from trunk to spark(Adam via Liyun)
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1784882&r1=1784881&r2=1784882&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java Wed Mar 1 08:56:54 2017 @@ -25,28 +25,26 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import org.apache.pig.PigConstants; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin; -import org.apache.pig.backend.hadoop.executionengine.spark.SparkEngineConf; -import org.apache.pig.impl.util.UDFContext; -import org.apache.pig.tools.pigstats.spark.SparkCounters; -import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter; -import org.apache.pig.tools.pigstats.spark.SparkStatsUtil; import scala.Function1; import scala.Tuple2; import scala.runtime.AbstractFunction1; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.pig.LoadFunc; +import org.apache.pig.PigConstants; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkEngineConf; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.pig.backend.hadoop.executionengine.spark.running.PigInputFormatSpark; import org.apache.pig.data.Tuple; @@ -54,6 +52,10 @@ import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.util.ObjectSerializer; +import org.apache.pig.impl.util.UDFContext; +import org.apache.pig.tools.pigstats.spark.SparkCounters; +import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter; +import org.apache.pig.tools.pigstats.spark.SparkStatsUtil; import org.apache.spark.SparkContext; import org.apache.spark.TaskContext; import org.apache.spark.rdd.RDD; @@ -170,8 +172,9 @@ public class LoadConverter implements RD public Tuple apply(Tuple2<Text, Tuple> v1) { if (!initialized) { long partitionId = TaskContext.get().partitionId(); - PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, Long.toString(partitionId)); - + Configuration jobConf = PigMapReduce.sJobConfInternal.get(); + jobConf.set(PigConstants.TASK_INDEX, Long.toString(partitionId)); + jobConf.set(MRConfiguration.TASK_ID, Long.toString(partitionId)); initialized = true; } if (sparkCounters != null && disableCounter == false) {