Author: xuefu Date: Thu Jun 18 04:08:23 2015 New Revision: 1686137 URL: http://svn.apache.org/r1686137 Log: PIG-4606: Enable TestDefaultDateTimeZone unit tests in spark mode (Liyun via Xuefu)
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1686137&r1=1686136&r2=1686137&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Thu Jun 18 04:08:23 2015 @@ -89,6 +89,7 @@ import org.apache.pig.impl.plan.Operator import org.apache.pig.impl.plan.PlanException; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.JarManager; +import org.apache.pig.impl.util.Utils; import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.tools.pigstats.spark.SparkPigStats; import org.apache.pig.tools.pigstats.spark.SparkStatsUtil; @@ -112,6 +113,7 @@ public class SparkLauncher extends Launc private static JobMetricsListener jobMetricsListener = new JobMetricsListener(); private String jobGroupID; private PigContext pigContext = null; + private JobConf jobConf = null; private String currentDirectoryPath = null; @Override @@ -120,12 +122,7 @@ public class SparkLauncher extends Launc if (LOG.isDebugEnabled()) LOG.debug(physicalPlan); this.pigContext = pigContext; - saveUdfImportList(pigContext); - JobConf jobConf = SparkUtil.newJobConf(pigContext); - jobConf.set(PigConstants.LOCAL_CODE_DIR, - System.getProperty("java.io.tmpdir")); - - SchemaTupleBackend.initialize(jobConf, pigContext); + initialize(); SparkOperPlan sparkplan = compile(physicalPlan, pigContext); if (LOG.isDebugEnabled()) explain(sparkplan, System.out, "text", true); @@ -146,7 +143,7 @@ public class SparkLauncher extends Launc this.currentDirectoryPath = Paths.get(".").toAbsolutePath() .normalize().toString() + "/"; - startSparkJob(); + addFilesToSparkJob(); LinkedList<POStore> stores = PlanHelper.getPhysicalOperators( physicalPlan, POStore.class); POStore firstStore = stores.getFirst(); @@ -272,15 +269,14 @@ public class SparkLauncher extends Launc } } - private void startSparkJob() throws IOException { - LOG.info("start Spark Job"); + private void addFilesToSparkJob() throws IOException { + LOG.info("add Files Spark Job"); String shipFiles = pigContext.getProperties().getProperty( "pig.streaming.ship.files"); shipFiles(shipFiles); String cacheFiles = pigContext.getProperties().getProperty( "pig.streaming.cache.files"); cacheFiles(cacheFiles); - } @@ -645,10 +641,19 @@ public class SparkLauncher extends Launc * Later we will use PigContext#properties.getProperty("spark.udf.import.list")in PigContext#writeObject. * we don't save this value in PigContext#properties.getProperty("udf.import.list") * because this will cause OOM problem(detailed see PIG-4295). - * @param pigContext */ - private void saveUdfImportList(PigContext pigContext) { + private void saveUdfImportList() { String udfImportList = Joiner.on(",").join(PigContext.getPackageImportList()); pigContext.getProperties().setProperty("spark.udf.import.list", udfImportList); } + + private void initialize() throws IOException { + saveUdfImportList(); + jobConf = SparkUtil.newJobConf(pigContext); + jobConf.set(PigConstants.LOCAL_CODE_DIR, + System.getProperty("java.io.tmpdir")); + + SchemaTupleBackend.initialize(jobConf, pigContext); + Utils.setDefaultTimeZone(jobConf); + } }