Author: xuefu
Date: Thu Jun 18 04:08:23 2015
New Revision: 1686137

URL: http://svn.apache.org/r1686137
Log:
PIG-4606: Enable TestDefaultDateTimeZone unit tests in spark mode (Liyun via 
Xuefu)

Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1686137&r1=1686136&r2=1686137&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Thu Jun 18 04:08:23 2015
@@ -89,6 +89,7 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.JarManager;
+import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.spark.SparkPigStats;
 import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
@@ -112,6 +113,7 @@ public class SparkLauncher extends Launc
        private static JobMetricsListener jobMetricsListener = new 
JobMetricsListener();
        private String jobGroupID;
     private PigContext pigContext = null;
+    private JobConf jobConf = null;
     private String currentDirectoryPath = null;
 
        @Override
@@ -120,12 +122,7 @@ public class SparkLauncher extends Launc
                if (LOG.isDebugEnabled())
                    LOG.debug(physicalPlan);
         this.pigContext = pigContext;
-        saveUdfImportList(pigContext);
-               JobConf jobConf = SparkUtil.newJobConf(pigContext);
-               jobConf.set(PigConstants.LOCAL_CODE_DIR,
-                               System.getProperty("java.io.tmpdir"));
-
-               SchemaTupleBackend.initialize(jobConf, pigContext);
+               initialize();
                SparkOperPlan sparkplan = compile(physicalPlan, pigContext);
                if (LOG.isDebugEnabled())
                          explain(sparkplan, System.out, "text", true);
@@ -146,7 +143,7 @@ public class SparkLauncher extends Launc
                this.currentDirectoryPath = Paths.get(".").toAbsolutePath()
                                .normalize().toString()
                                + "/";
-               startSparkJob();
+               addFilesToSparkJob();
                LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
                                physicalPlan, POStore.class);
                POStore firstStore = stores.getFirst();
@@ -272,15 +269,14 @@ public class SparkLauncher extends Launc
                }
        }
 
-       private void startSparkJob() throws IOException {
-               LOG.info("start Spark Job");
+       private void addFilesToSparkJob() throws IOException {
+               LOG.info("add Files Spark Job");
                String shipFiles = pigContext.getProperties().getProperty(
                                "pig.streaming.ship.files");
                shipFiles(shipFiles);
                String cacheFiles = pigContext.getProperties().getProperty(
                                "pig.streaming.cache.files");
                cacheFiles(cacheFiles);
-
        }
 
 
@@ -645,10 +641,19 @@ public class SparkLauncher extends Launc
      * Later we will use 
PigContext#properties.getProperty("spark.udf.import.list")in 
PigContext#writeObject.
      * we don't save this value in 
PigContext#properties.getProperty("udf.import.list")
      * because this will cause OOM problem(detailed see PIG-4295).
-     * @param pigContext
      */
-    private void saveUdfImportList(PigContext pigContext) {
+    private void saveUdfImportList() {
         String udfImportList = 
Joiner.on(",").join(PigContext.getPackageImportList());
         pigContext.getProperties().setProperty("spark.udf.import.list", 
udfImportList);
     }
+
+    private void initialize() throws IOException {
+        saveUdfImportList();
+        jobConf = SparkUtil.newJobConf(pigContext);
+        jobConf.set(PigConstants.LOCAL_CODE_DIR,
+                System.getProperty("java.io.tmpdir"));
+
+        SchemaTupleBackend.initialize(jobConf, pigContext);
+        Utils.setDefaultTimeZone(jobConf);
+    }
 }


Reply via email to