Author: xuefu
Date: Wed Feb  4 21:32:34 2015
New Revision: 1657405

URL: http://svn.apache.org/r1657405
Log:
HIVE-9572: Merge from Spark branch to trunk 02/03/2015 (reviewed by Brock)

Added:
    hive/trunk/data/conf/spark/standalone/
      - copied from r1657401, hive/branches/spark/data/conf/spark/standalone/
    hive/trunk/data/conf/spark/yarn-client/
      - copied from r1657401, hive/branches/spark/data/conf/spark/yarn-client/
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkSMBMapJoinInfo.java
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkSMBMapJoinInfo.java
    hive/trunk/ql/src/test/queries/clientpositive/lateral_view_explode2.q
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/queries/clientpositive/lateral_view_explode2.q
    hive/trunk/ql/src/test/results/clientpositive/lateral_view_explode2.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/lateral_view_explode2.q.out
    hive/trunk/ql/src/test/results/clientpositive/spark/bucket5.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/bucket5.q.out
    hive/trunk/ql/src/test/results/clientpositive/spark/bucket6.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/bucket6.q.out
    
hive/trunk/ql/src/test/results/clientpositive/spark/bucketizedhiveinputformat.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/bucketizedhiveinputformat.q.out
    
hive/trunk/ql/src/test/results/clientpositive/spark/constprog_partitioner.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/constprog_partitioner.q.out
    hive/trunk/ql/src/test/results/clientpositive/spark/empty_dir_in_table.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/empty_dir_in_table.q.out
    
hive/trunk/ql/src/test/results/clientpositive/spark/external_table_with_space_in_location_path.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/external_table_with_space_in_location_path.q.out
    
hive/trunk/ql/src/test/results/clientpositive/spark/file_with_header_footer.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/file_with_header_footer.q.out
    
hive/trunk/ql/src/test/results/clientpositive/spark/import_exported_table.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/import_exported_table.q.out
    hive/trunk/ql/src/test/results/clientpositive/spark/index_bitmap3.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/index_bitmap3.q.out
    hive/trunk/ql/src/test/results/clientpositive/spark/index_bitmap_auto.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/index_bitmap_auto.q.out
    
hive/trunk/ql/src/test/results/clientpositive/spark/infer_bucket_sort_bucketed_table.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/infer_bucket_sort_bucketed_table.q.out
    
hive/trunk/ql/src/test/results/clientpositive/spark/infer_bucket_sort_map_operators.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/infer_bucket_sort_map_operators.q.out
    
hive/trunk/ql/src/test/results/clientpositive/spark/infer_bucket_sort_merge.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/infer_bucket_sort_merge.q.out
    
hive/trunk/ql/src/test/results/clientpositive/spark/infer_bucket_sort_num_buckets.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/infer_bucket_sort_num_buckets.q.out
    
hive/trunk/ql/src/test/results/clientpositive/spark/infer_bucket_sort_reducers_power_two.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/infer_bucket_sort_reducers_power_two.q.out
    hive/trunk/ql/src/test/results/clientpositive/spark/input16_cc.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/input16_cc.q.out
    
hive/trunk/ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out
    
hive/trunk/ql/src/test/results/clientpositive/spark/list_bucket_dml_10.q.java1.7.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/list_bucket_dml_10.q.java1.7.out
    hive/trunk/ql/src/test/results/clientpositive/spark/load_fs2.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/load_fs2.q.out
    
hive/trunk/ql/src/test/results/clientpositive/spark/load_hdfs_file_with_space_in_the_name.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/load_hdfs_file_with_space_in_the_name.q.out
    hive/trunk/ql/src/test/results/clientpositive/spark/parallel_orderby.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/parallel_orderby.q.out
    hive/trunk/ql/src/test/results/clientpositive/spark/ql_rewrite_gbtoidx.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/ql_rewrite_gbtoidx.q.out
    
hive/trunk/ql/src/test/results/clientpositive/spark/ql_rewrite_gbtoidx_cbo_1.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/ql_rewrite_gbtoidx_cbo_1.q.out
    hive/trunk/ql/src/test/results/clientpositive/spark/quotedid_smb.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/quotedid_smb.q.out
    hive/trunk/ql/src/test/results/clientpositive/spark/reduce_deduplicate.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/reduce_deduplicate.q.out
    hive/trunk/ql/src/test/results/clientpositive/spark/remote_script.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/remote_script.q.out
    
hive/trunk/ql/src/test/results/clientpositive/spark/root_dir_external_table.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/root_dir_external_table.q.out
    hive/trunk/ql/src/test/results/clientpositive/spark/schemeAuthority.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/schemeAuthority.q.out
    hive/trunk/ql/src/test/results/clientpositive/spark/schemeAuthority2.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/schemeAuthority2.q.out
    
hive/trunk/ql/src/test/results/clientpositive/spark/temp_table_external.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/temp_table_external.q.out
    
hive/trunk/ql/src/test/results/clientpositive/spark/truncate_column_buckets.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/truncate_column_buckets.q.out
    hive/trunk/ql/src/test/results/clientpositive/spark/uber_reduce.q.out
      - copied unchanged from r1657401, 
hive/branches/spark/ql/src/test/results/clientpositive/spark/uber_reduce.q.out
    
hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/MiniSparkOnYARNCluster.java
      - copied unchanged from r1657401, 
hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/MiniSparkOnYARNCluster.java
    
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
      - copied unchanged from r1657401, 
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
    
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/SaslHandler.java
      - copied unchanged from r1657401, 
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/SaslHandler.java
Removed:
    hive/trunk/data/conf/spark/hive-site.xml
Modified:
    hive/trunk/   (props changed)
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/hbase-handler/pom.xml   (props changed)
    hive/trunk/itests/pom.xml
    hive/trunk/itests/qtest-spark/pom.xml
    hive/trunk/itests/src/test/resources/testconfiguration.properties
    
hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
    hive/trunk/pom.xml
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
    
hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
    
hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
    
hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
    
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
    
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
    
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
    
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
    
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
    
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java
    
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/README.md
    
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
    
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
    
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
    
hive/trunk/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestKryoMessageCodec.java
    
hive/trunk/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java

Propchange: hive/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb  4 21:32:34 2015
@@ -1,5 +1,5 @@
 
/hive/branches/branch-0.11:1480385,1480458,1481120,1481344,1481346,1481348,1481352,1483872,1505184
 /hive/branches/cbo:1605012-1627125
-/hive/branches/spark:1608589-1654414
+/hive/branches/spark:1608589-1657401
 /hive/branches/tez:1494760-1622766
 /hive/branches/vectorization:1466908-1527856

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
(original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed 
Feb  4 21:32:34 2015
@@ -2030,7 +2030,9 @@ public class HiveConf extends Configurat
     SPARK_RPC_MAX_MESSAGE_SIZE("hive.spark.client.rpc.max.size", 50 * 1024 * 
1024,
       "Maximum message size in bytes for communication between Hive client and 
remote Spark driver. Default is 50MB."),
     SPARK_RPC_CHANNEL_LOG_LEVEL("hive.spark.client.channel.log.level", null,
-      "Channel logging level for remote Spark driver.  One of {DEBUG, ERROR, 
INFO, TRACE, WARN}.");
+      "Channel logging level for remote Spark driver.  One of {DEBUG, ERROR, 
INFO, TRACE, WARN}."),
+    SPARK_RPC_SASL_MECHANISM("hive.spark.client.rpc.sasl.mechanisms", 
"DIGEST-MD5",
+      "Name of the SASL mechanism to use for authentication.");
 
     public final String varname;
     private final String defaultExpr;
@@ -2270,10 +2272,33 @@ public class HiveConf extends Configurat
       throw new IllegalArgumentException("Cannot modify " + name + " at 
runtime. It is in the list"
           + "of parameters that can't be modified at runtime");
     }
-    isSparkConfigUpdated = name.startsWith("spark");
+    isSparkConfigUpdated = isSparkRelatedConfig(name);
     set(name, value);
   }
 
+  /**
+   * check whether spark related property is updated, which includes spark 
configurations,
+   * RSC configurations and yarn configuration in Spark on YARN mode.
+   * @param name
+   * @return
+   */
+  private boolean isSparkRelatedConfig(String name) {
+    boolean result = false;
+    if (name.startsWith("spark")) { // Spark property.
+      result = true;
+    } else if (name.startsWith("yarn")) { // YARN property in Spark on YARN 
mode.
+      String sparkMaster = get("spark.master");
+      if (sparkMaster != null &&
+        (sparkMaster.equals("yarn-client") || 
sparkMaster.equals("yarn-cluster"))) {
+        result = true;
+      }
+    } else if (name.startsWith("hive.spark")) { // Remote Spark Context 
property.
+      result = true;
+    }
+
+    return result;
+  }
+
   public static int getIntVar(Configuration conf, ConfVars var) {
     assert (var.valClass == Integer.class) : var.varname;
     return conf.getInt(var.varname, var.defaultIntVal);

Propchange: hive/trunk/hbase-handler/pom.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb  4 21:32:34 2015
@@ -1,6 +1,6 @@
 
/hive/branches/branch-0.11/hbase-handler/pom.xml:1480385,1480458,1481120,1481344,1481346,1481348,1481352,1483872,1505184
 /hive/branches/cbo/hbase-handler/pom.xml:1605012-1627125
-/hive/branches/spark/hbase-handler/pom.xml:1608589-1654414
+/hive/branches/spark/hbase-handler/pom.xml:1608589-1657401
 /hive/branches/tez/hbase-handler/pom.xml:1494760-1622766
 /hive/branches/vectorization/hbase-handler/pom.xml:1466908-1527856
 /hive/trunk/hbase-handler/pom.xml:1494760-1537575

Modified: hive/trunk/itests/pom.xml
URL: 
http://svn.apache.org/viewvc/hive/trunk/itests/pom.xml?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- hive/trunk/itests/pom.xml (original)
+++ hive/trunk/itests/pom.xml Wed Feb  4 21:32:34 2015
@@ -88,7 +88,7 @@
                      curl -Sso $DOWNLOAD_DIR/$tarName $url
                     fi
                     tar -zxf $DOWNLOAD_DIR/$tarName -C $BASE_DIR
-                    mv $BASE_DIR/${finalName}* $BASE_DIR/$finalName
+                    mv 
$BASE_DIR/spark-${spark.version}-bin-hadoop2-without-hive $BASE_DIR/$finalName
                   }
                   mkdir -p $DOWNLOAD_DIR
                   download 
"http://d3jw87u4immizc.cloudfront.net/spark-tarball/spark-${spark.version}-bin-hadoop2-without-hive.tgz";
 "spark"

Modified: hive/trunk/itests/qtest-spark/pom.xml
URL: 
http://svn.apache.org/viewvc/hive/trunk/itests/qtest-spark/pom.xml?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- hive/trunk/itests/qtest-spark/pom.xml (original)
+++ hive/trunk/itests/qtest-spark/pom.xml Wed Feb  4 21:32:34 2015
@@ -37,7 +37,6 @@
     <qfile></qfile>
     <qfile_regex></qfile_regex>
     <run_disabled>false</run_disabled>
-    <clustermode>spark</clustermode>
     <execute.beeline.tests>false</execute.beeline.tests>
     <active.hadoop.version>${hadoop-23.version}</active.hadoop.version>
     <test.dfs.mkdir>-mkdir -p</test.dfs.mkdir>
@@ -333,22 +332,39 @@
                 <taskdef name="qtestgen" 
classname="org.apache.hadoop.hive.ant.QTestGenTask"
                   classpath="${test.classpath}" />
                 <mkdir 
dir="${project.build.directory}/qfile-results/clientpositive/spark" />
+                <mkdir 
dir="${project.build.directory}/qfile-results/clientpositive/miniSparkOnYarn" />
                 <qtestgen hiveRootDirectory="${basedir}/${hive.path.to.root}/"
                   
outputDirectory="${project.build.directory}/generated-test-sources/java/org/apache/hadoop/hive/cli/"
                   
templatePath="${basedir}/${hive.path.to.root}/ql/src/test/templates/" 
template="TestCliDriver.vm"
                   
queryDirectory="${basedir}/${hive.path.to.root}/ql/src/test/queries/clientpositive/"
                   queryFile="${qfile}"
                   queryFileRegex="${qfile_regex}"
-                  clusterMode="${clustermode}"
+                  clusterMode="spark"
                   includeQueryFile="${spark.query.files}"
                   runDisabled="${run_disabled}"
-                  hiveConfDir="${basedir}/${hive.path.to.root}/data/conf/spark"
+                  
hiveConfDir="${basedir}/${hive.path.to.root}/data/conf/spark/standalone"
                   
resultsDirectory="${basedir}/${hive.path.to.root}/ql/src/test/results/clientpositive/spark"
                   className="TestSparkCliDriver"
                   
logFile="${project.build.directory}/testsparkclidrivergen.log"
                   
logDirectory="${project.build.directory}/qfile-results/clientpositive/spark"
                   initScript="q_test_init.sql"
                   cleanupScript="q_test_cleanup.sql"/>
+                <qtestgen hiveRootDirectory="${basedir}/${hive.path.to.root}/"
+                  
outputDirectory="${project.build.directory}/generated-test-sources/java/org/apache/hadoop/hive/cli/"
+                  
templatePath="${basedir}/${hive.path.to.root}/ql/src/test/templates/" 
template="TestCliDriver.vm"
+                  
queryDirectory="${basedir}/${hive.path.to.root}/ql/src/test/queries/clientpositive/"
+                  queryFile="${qfile}"
+                  queryFileRegex="${qfile_regex}"
+                  clusterMode="miniSparkOnYarn"
+                  includeQueryFile="${miniSparkOnYarn.query.files}"
+                  runDisabled="${run_disabled}"
+                  
hiveConfDir="${basedir}/${hive.path.to.root}/data/conf/spark/yarn-client"
+                  
resultsDirectory="${basedir}/${hive.path.to.root}/ql/src/test/results/clientpositive/spark"
+                  className="TestMiniSparkOnYarnCliDriver"
+                  
logFile="${project.build.directory}/testminisparkonyarnclidrivergen.log"
+                  
logDirectory="${project.build.directory}/qfile-results/clientpositive/spark"
+                  initScript="q_test_init.sql"
+                  cleanupScript="q_test_cleanup.sql"/>
               </target>
             </configuration>
             <goals>

Modified: hive/trunk/itests/src/test/resources/testconfiguration.properties
URL: 
http://svn.apache.org/viewvc/hive/trunk/itests/src/test/resources/testconfiguration.properties?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- hive/trunk/itests/src/test/resources/testconfiguration.properties (original)
+++ hive/trunk/itests/src/test/resources/testconfiguration.properties Wed Feb  
4 21:32:34 2015
@@ -715,6 +715,7 @@ spark.query.files=add_part_multiple.q, \
   join_thrift.q, \
   join_vc.q, \
   join_view.q, \
+  lateral_view_explode2.q, \
   leftsemijoin.q, \
   leftsemijoin_mr.q, \
   limit_partition_metadataonly.q, \
@@ -1000,3 +1001,47 @@ spark.query.files=add_part_multiple.q, \
   vectorized_string_funcs.q, \
   vectorized_timestamp_funcs.q, \
   windowing.q
+
+miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\
+  bucket4.q,\
+  bucket5.q,\
+  bucket6.q,\
+  bucketizedhiveinputformat.q,\
+  bucketmapjoin6.q,\
+  bucketmapjoin7.q,\
+  constprog_partitioner.q,\
+  disable_merge_for_bucketing.q,\
+  empty_dir_in_table.q,\
+  external_table_with_space_in_location_path.q,\
+  file_with_header_footer.q,\
+  import_exported_table.q,\
+  index_bitmap3.q,\
+  index_bitmap_auto.q,\
+  infer_bucket_sort_bucketed_table.q,\
+  infer_bucket_sort_map_operators.q,\
+  infer_bucket_sort_merge.q,\
+  infer_bucket_sort_num_buckets.q,\
+  infer_bucket_sort_reducers_power_two.q,\
+  input16_cc.q,\
+  leftsemijoin_mr.q,\
+  list_bucket_dml_10.q,\
+  load_fs2.q,\
+  load_hdfs_file_with_space_in_the_name.q,\
+  optrstat_groupby.q,\
+  parallel_orderby.q,\
+  ql_rewrite_gbtoidx.q,\
+  ql_rewrite_gbtoidx_cbo_1.q,\
+  quotedid_smb.q,\
+  reduce_deduplicate.q,\
+  remote_script.q,\
+  root_dir_external_table.q,\
+  schemeAuthority.q,\
+  schemeAuthority2.q,\
+  scriptfile1.q,\
+  scriptfile1_win.q,\
+  smb_mapjoin_8.q,\
+  stats_counter.q,\
+  stats_counter_partitioned.q,\
+  temp_table_external.q,\
+  truncate_column_buckets.q,\
+  uber_reduce.q

Modified: 
hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- 
hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java 
(original)
+++ 
hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java 
Wed Feb  4 21:32:34 2015
@@ -302,6 +302,7 @@ public class QTestUtil {
     tez,
     spark,
     encrypted,
+    miniSparkOnYarn,
     none;
 
     public static MiniClusterType valueForString(String type) {
@@ -313,6 +314,8 @@ public class QTestUtil {
         return spark;
       } else if (type.equals("encrypted")) {
         return encrypted;
+      } else if (type.equals("miniSparkOnYarn")) {
+        return miniSparkOnYarn;
       } else {
         return none;
       }
@@ -380,6 +383,8 @@ public class QTestUtil {
       String uriString = 
WindowsPathUtil.getHdfsUriString(fs.getUri().toString());
       if (clusterType == MiniClusterType.tez) {
         mr = shims.getMiniTezCluster(conf, 4, uriString, 1);
+      } else if (clusterType == MiniClusterType.miniSparkOnYarn) {
+        mr = shims.getMiniSparkCluster(conf, 4, uriString, 1);
       } else {
         mr = shims.getMiniMrCluster(conf, 4, uriString, 1);
       }
@@ -875,7 +880,8 @@ public class QTestUtil {
     ss.setIsSilent(true);
     SessionState oldSs = SessionState.get();
 
-    if (oldSs != null && (clusterType == MiniClusterType.tez || clusterType == 
MiniClusterType.spark)) {
+    if (oldSs != null && (clusterType == MiniClusterType.tez || clusterType == 
MiniClusterType.spark
+        || clusterType == MiniClusterType.miniSparkOnYarn)) {
       sparkSession = oldSs.getSparkSession();
       ss.setSparkSession(sparkSession);
       oldSs.setSparkSession(null);
@@ -935,7 +941,8 @@ public class QTestUtil {
     ss.err = System.out;
 
     SessionState oldSs = SessionState.get();
-    if (oldSs != null && (clusterType == MiniClusterType.tez || clusterType == 
MiniClusterType.spark)) {
+    if (oldSs != null && (clusterType == MiniClusterType.tez || clusterType == 
MiniClusterType.spark
+        || clusterType == MiniClusterType.miniSparkOnYarn)) {
       sparkSession = oldSs.getSparkSession();
       ss.setSparkSession(sparkSession);
       oldSs.setSparkSession(null);

Modified: hive/trunk/pom.xml
URL: 
http://svn.apache.org/viewvc/hive/trunk/pom.xml?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- hive/trunk/pom.xml (original)
+++ hive/trunk/pom.xml Wed Feb  4 21:32:34 2015
@@ -825,6 +825,7 @@
             
<HADOOP_CLASSPATH>${test.tmp.dir}/conf:${basedir}/${hive.path.to.root}/conf</HADOOP_CLASSPATH>
             
<HIVE_HADOOP_TEST_CLASSPATH>${test.hive.hadoop.classpath}</HIVE_HADOOP_TEST_CLASSPATH>
             
<SPARK_SUBMIT_CLASSPATH>${spark.home}/lib/spark-assembly-${spark.version}-hadoop2.4.0.jar:${test.hive.hadoop.classpath}</SPARK_SUBMIT_CLASSPATH>
+            <SPARK_OSX_TEST_OPTS>-Dorg.xerial.snappy.tempdir=/tmp 
-Dorg.xerial.snappy.lib.name=libsnappyjava.jnilib</SPARK_OSX_TEST_OPTS>
             <PATH>${env.PATH}${test.extra.path}</PATH>
           </environmentVariables>
           <systemPropertyVariables>

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Wed 
Feb  4 21:32:34 2015
@@ -207,6 +207,7 @@ public final class Utilities {
   public static final String INPUT_NAME = "iocontext.input.name";
   public static final String MAPRED_MAPPER_CLASS = "mapred.mapper.class";
   public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class";
+  public static final String HIVE_ADDED_JARS = "hive.added.jars";
 
   /**
    * ReduceField:
@@ -364,6 +365,18 @@ public final class Utilities {
     Path path = null;
     InputStream in = null;
     try {
+      String engine = HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE);
+      if (engine.equals("spark")) {
+        // TODO Add jar into current thread context classloader as it may be 
invoked by Spark driver inside
+        // threads, should be unnecessary while SPARK-5377 is resolved.
+        String addedJars = conf.get(HIVE_ADDED_JARS);
+        if (addedJars != null && !addedJars.isEmpty()) {
+          ClassLoader loader = Thread.currentThread().getContextClassLoader();
+          ClassLoader newLoader = addToClassPath(loader, addedJars.split(";"));
+          Thread.currentThread().setContextClassLoader(newLoader);
+        }
+      }
+
       path = getPlanPath(conf, name);
       LOG.info("PLAN PATH = " + path);
       assert path != null;

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
 (original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
 Wed Feb  4 21:32:34 2015
@@ -42,6 +42,7 @@ public class HiveSparkClientFactory {
   private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf";
   private static final String SPARK_DEFAULT_MASTER = "local";
   private static final String SPARK_DEFAULT_APP_NAME = "Hive on Spark";
+  private static final String SPARK_DEFAULT_SERIALIZER = 
"org.apache.spark.serializer.KryoSerializer";
 
   public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf)
     throws IOException, SparkException {
@@ -64,8 +65,7 @@ public class HiveSparkClientFactory {
     // set default spark configurations.
     sparkConf.put("spark.master", SPARK_DEFAULT_MASTER);
     sparkConf.put("spark.app.name", SPARK_DEFAULT_APP_NAME);
-    sparkConf.put("spark.serializer",
-      "org.apache.spark.serializer.KryoSerializer");
+    sparkConf.put("spark.serializer", SPARK_DEFAULT_SERIALIZER);
 
     // load properties from spark-defaults.conf.
     InputStream inputStream = null;
@@ -81,7 +81,7 @@ public class HiveSparkClientFactory {
             String value = properties.getProperty(propertyName);
             sparkConf.put(propertyName, properties.getProperty(propertyName));
             LOG.info(String.format(
-              "load spark configuration from %s (%s -> %s).",
+              "load spark property from %s (%s -> %s).",
               SPARK_DEFAULT_CONF_FILE, propertyName, value));
           }
         }
@@ -99,22 +99,36 @@ public class HiveSparkClientFactory {
       }
     }
 
-    // load properties from hive configurations, including both spark.* 
properties
-    // and properties for remote driver RPC.
+    // load properties from hive configurations, including both spark.* 
properties,
+    // properties for remote driver RPC, and yarn properties for Spark on YARN 
mode.
+    String sparkMaster = hiveConf.get("spark.master");
+    if (sparkMaster == null) {
+      sparkMaster = sparkConf.get("spark.master");
+    }
     for (Map.Entry<String, String> entry : hiveConf) {
       String propertyName = entry.getKey();
       if (propertyName.startsWith("spark")) {
         String value = hiveConf.get(propertyName);
         sparkConf.put(propertyName, value);
         LOG.info(String.format(
-          "load spark configuration from hive configuration (%s -> %s).",
+          "load spark property from hive configuration (%s -> %s).",
           propertyName, value));
+      } else if (propertyName.startsWith("yarn") &&
+        (sparkMaster.equals("yarn-client") || 
sparkMaster.equals("yarn-cluster"))) {
+        String value = hiveConf.get(propertyName);
+        // Add spark.hadoop prefix for yarn properties as SparkConf only 
accept properties
+        // started with spark prefix, Spark would remove spark.hadoop prefix 
lately and add
+        // it to its hadoop configuration.
+        sparkConf.put("spark.hadoop." + propertyName, value);
+        LOG.info(String.format(
+          "load yarn property from hive configuration in %s mode (%s -> %s).",
+          sparkMaster, propertyName, value));
       }
       if (RpcConfiguration.HIVE_SPARK_RSC_CONFIGS.contains(propertyName)) {
         String value = RpcConfiguration.getValue(hiveConf, propertyName);
         sparkConf.put(propertyName, value);
         LOG.info(String.format(
-          "load RPC configuration from hive configuration (%s -> %s).",
+          "load RPC property from hive configuration (%s -> %s).",
           propertyName, value));
       }
     }

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
 (original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
 Wed Feb  4 21:32:34 2015
@@ -17,15 +17,20 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark;
 
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.MalformedURLException;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
@@ -48,15 +53,13 @@ import org.apache.hive.spark.client.JobC
 import org.apache.hive.spark.client.JobHandle;
 import org.apache.hive.spark.client.SparkClient;
 import org.apache.hive.spark.client.SparkClientFactory;
+import org.apache.hive.spark.client.SparkClientUtilities;
 import org.apache.hive.spark.counter.SparkCounters;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkException;
 import org.apache.spark.api.java.JavaFutureAction;
 import org.apache.spark.api.java.JavaPairRDD;
 
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
-
 /**
  * RemoteSparkClient is a wrapper of {@link 
org.apache.hive.spark.client.SparkClient}, which
  * wrap a spark job request and send to an remote SparkContext.
@@ -74,8 +77,8 @@ public class RemoteHiveSparkClient imple
   private transient SparkConf sparkConf;
   private transient HiveConf hiveConf;
 
-  private transient List<String> localJars = new ArrayList<String>();
-  private transient List<String> localFiles = new ArrayList<String>();
+  private transient List<URL> localJars = new ArrayList<URL>();
+  private transient List<URL> localFiles = new ArrayList<URL>();
 
   private final transient long sparkClientTimtout;
 
@@ -159,26 +162,28 @@ public class RemoteHiveSparkClient imple
 
   private void addResources(String addedFiles) {
     for (String addedFile : 
CSV_SPLITTER.split(Strings.nullToEmpty(addedFiles))) {
-      if (!localFiles.contains(addedFile)) {
-        localFiles.add(addedFile);
-        try {
-          remoteClient.addFile(SparkUtilities.getURL(addedFile));
-        } catch (MalformedURLException e) {
-          LOG.warn("Failed to add file:" + addedFile);
+      try {
+        URL fileUrl = SparkUtilities.getURL(addedFile);
+        if (fileUrl != null && !localFiles.contains(fileUrl)) {
+          localFiles.add(fileUrl);
+          remoteClient.addFile(fileUrl);
         }
+      } catch (MalformedURLException e) {
+        LOG.warn("Failed to add file:" + addedFile);
       }
     }
   }
 
   private void addJars(String addedJars) {
     for (String addedJar : CSV_SPLITTER.split(Strings.nullToEmpty(addedJars))) 
{
-      if (!localJars.contains(addedJar)) {
-        localJars.add(addedJar);
-        try {
-          remoteClient.addJar(SparkUtilities.getURL(addedJar));
-        } catch (MalformedURLException e) {
-          LOG.warn("Failed to add jar:" + addedJar);
+      try {
+        URL jarUrl = SparkUtilities.getURL(addedJar);
+        if (jarUrl != null && !localJars.contains(jarUrl)) {
+          localJars.add(jarUrl);
+          remoteClient.addJar(jarUrl);
         }
+      } catch (MalformedURLException e) {
+        LOG.warn("Failed to add jar:" + addedJar);
       }
     }
   }
@@ -208,6 +213,15 @@ public class RemoteHiveSparkClient imple
     @Override
     public Serializable call(JobContext jc) throws Exception {
       JobConf localJobConf = KryoSerializer.deserializeJobConf(jobConfBytes);
+
+      // Add jar to current thread class loader dynamically, and add jar paths 
to JobConf as Spark
+      // may need to load classes from this jar in other threads.
+      List<String> addedJars = jc.getAddedJars();
+      if (addedJars != null && !addedJars.isEmpty()) {
+        SparkClientUtilities.addToClassPath(addedJars.toArray(new 
String[addedJars.size()]));
+        localJobConf.set(Utilities.HIVE_ADDED_JARS, 
StringUtils.join(addedJars, ";"));
+      }
+
       Path localScratchDir = KryoSerializer.deserialize(scratchDirBytes, 
Path.class);
       SparkWork localSparkWork = KryoSerializer.deserialize(sparkWorkBytes, 
SparkWork.class);
 
@@ -234,7 +248,6 @@ public class RemoteHiveSparkClient imple
       jc.monitor(future, sparkCounters, plan.getCachedRDDIds());
       return null;
     }
-
   }
 
 }

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
 (original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
 Wed Feb  4 21:32:34 2015
@@ -233,6 +233,7 @@ public class SparkPlanGenerator {
       throw new IllegalArgumentException(msg, e);
     }
     if (work instanceof MapWork) {
+      cloned.setBoolean("mapred.task.is.map", true);
       List<Path> inputPaths = Utilities.getInputPaths(cloned, (MapWork) work,
           scratchDir, context, false);
       Utilities.setInputPaths(cloned, inputPaths);
@@ -250,6 +251,7 @@ public class SparkPlanGenerator {
       // remember the JobConf cloned for each MapWork, so we won't clone for 
it again
       workToJobConf.put(work, cloned);
     } else if (work instanceof ReduceWork) {
+      cloned.setBoolean("mapred.task.is.map", false);
       Utilities.setReduceWork(cloned, (ReduceWork) work, scratchDir, false);
       Utilities.createTmpDirs(cloned, (ReduceWork) work);
       cloned.set(Utilities.MAPRED_REDUCER_CLASS, ExecReducer.class.getName());

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
 (original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
 Wed Feb  4 21:32:34 2015
@@ -22,6 +22,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
@@ -43,6 +45,7 @@ import com.google.common.collect.Maps;
 public class LocalSparkJobStatus implements SparkJobStatus {
 
   private final JavaSparkContext sparkContext;
+  private static final Log LOG = 
LogFactory.getLog(LocalSparkJobStatus.class.getName());
   private int jobId;
   // After SPARK-2321, we only use JobMetricsListener to get job metrics
   // TODO: remove it when the new API provides equivalent functionality
@@ -69,16 +72,20 @@ public class LocalSparkJobStatus impleme
 
   @Override
   public JobExecutionStatus getState() {
+    SparkJobInfo sparkJobInfo = getJobInfo();
     // For spark job with empty source data, it's not submitted actually, so 
we would never
     // receive JobStart/JobEnd event in JobStateListener, use JavaFutureAction 
to get current
     // job state.
-    if (future.isDone()) {
+    if (sparkJobInfo == null && future.isDone()) {
+      try {
+        future.get();
+      } catch (Exception e) {
+        LOG.error("Failed to run job " + jobId, e);
+        return JobExecutionStatus.FAILED;
+      }
       return JobExecutionStatus.SUCCEEDED;
-    } else {
-      // SparkJobInfo may not be available yet
-      SparkJobInfo sparkJobInfo = getJobInfo();
-      return sparkJobInfo == null ? null : sparkJobInfo.status();
     }
+    return sparkJobInfo == null ? null : sparkJobInfo.status();
   }
 
   @Override

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
 (original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
 Wed Feb  4 21:32:34 2015
@@ -179,7 +179,15 @@ public class RemoteSparkJobStatus implem
         if (list != null && list.size() == 1) {
           JavaFutureAction<?> futureAction = list.get(0);
           if (futureAction.isDone()) {
-            jobInfo = getDefaultJobInfo(sparkJobId, 
JobExecutionStatus.SUCCEEDED);
+            boolean futureSucceed = true;
+            try {
+              futureAction.get();
+            } catch (Exception e) {
+              LOG.error("Failed to run job " + sparkJobId, e);
+              futureSucceed = false;
+            }
+            jobInfo = getDefaultJobInfo(sparkJobId,
+                futureSucceed ? JobExecutionStatus.SUCCEEDED : 
JobExecutionStatus.FAILED);
           }
         }
       }

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
 (original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
 Wed Feb  4 21:32:34 2015
@@ -316,7 +316,9 @@ public class Vectorizer implements Physi
         for (BaseWork baseWork : sparkWork.getAllWork()) {
           if (baseWork instanceof MapWork) {
             convertMapWork((MapWork) baseWork, false);
-          } else if (baseWork instanceof ReduceWork) {
+          } else if (baseWork instanceof ReduceWork
+              && HiveConf.getBoolVar(pctx.getConf(),
+                  HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_ENABLED)) {
             convertReduceWork((ReduceWork) baseWork);
           }
         }

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java
 (original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java
 Wed Feb  4 21:32:34 2015
@@ -1,20 +1,20 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.hive.ql.optimizer.spark;
 
 import java.util.List;
@@ -36,8 +36,8 @@ import org.apache.hadoop.hive.ql.plan.Ma
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 
 /**
-* Operator factory for Spark SMBJoin processing.
-*/
+ * Operator factory for Spark SMBJoin processing.
+ */
 public final class SparkSortMergeJoinFactory {
 
   private SparkSortMergeJoinFactory() {
@@ -45,131 +45,79 @@ public final class SparkSortMergeJoinFac
   }
 
   /**
-   * Get the branch on which we are invoked (walking) from.  See diagram below.
-   * We are at the SMBJoinOp and could have come from TS of any of the input 
tables.
-   */
-  public static int getPositionParent(SMBMapJoinOperator op,
-      Stack<Node> stack) {
-    int size = stack.size();
-    assert size >= 2 && stack.get(size - 1) == op;
-    @SuppressWarnings("unchecked")
-    Operator<? extends OperatorDesc> parent =
-        (Operator<? extends OperatorDesc>) stack.get(size - 2);
-    List<Operator<? extends OperatorDesc>> parOp = op.getParentOperators();
-    int pos = parOp.indexOf(parent);
-    return pos;
-  }
-
-  /**
-   * SortMergeMapJoin processor, input is a SMBJoinOp that is part of a 
MapWork:
-   *
-   *  MapWork:
-   *
-   *   (Big)   (Small)  (Small)
-   *    TS       TS       TS
-   *     \       |       /
-   *       \     DS     DS
-   *         \   |    /
-   *          SMBJoinOP
+   * Annotate MapWork, input is a SMBJoinOp that is part of a MapWork, and its 
root TS operator.
    *
    * 1. Initializes the MapWork's aliasToWork, pointing to big-table's TS.
    * 2. Adds the bucketing information to the MapWork.
    * 3. Adds localwork to the MapWork, with localWork's aliasToWork pointing 
to small-table's TS.
+   * @param context proc walker context
+   * @param mapWork mapwork to annotate
+   * @param smbMapJoinOp SMB Map Join operator to get data
+   * @param ts Table Scan operator to get data
+   * @param local Whether ts is from a 'local' source (small-table that will 
be loaded by SMBJoin 'local' task)
    */
-  private static class SortMergeJoinProcessor implements NodeProcessor {
+  public static void annotateMapWork(GenSparkProcContext context, MapWork 
mapWork,
+    SMBMapJoinOperator smbMapJoinOp, TableScanOperator ts, boolean local)
+    throws SemanticException {
+    initSMBJoinPlan(context, mapWork, ts, local);
+    setupBucketMapJoinInfo(mapWork, smbMapJoinOp);
+  }
 
-    public static void setupBucketMapJoinInfo(MapWork plan, SMBMapJoinOperator 
currMapJoinOp) {
-      if (currMapJoinOp != null) {
-        Map<String, Map<String, List<String>>> aliasBucketFileNameMapping =
-            currMapJoinOp.getConf().getAliasBucketFileNameMapping();
-        if (aliasBucketFileNameMapping != null) {
-          MapredLocalWork localPlan = plan.getMapRedLocalWork();
-          if (localPlan == null) {
-            localPlan = currMapJoinOp.getConf().getLocalWork();
-          } else {
-            // local plan is not null, we want to merge it into 
SMBMapJoinOperator's local work
-            MapredLocalWork smbLocalWork = 
currMapJoinOp.getConf().getLocalWork();
-            if (smbLocalWork != null) {
-              
localPlan.getAliasToFetchWork().putAll(smbLocalWork.getAliasToFetchWork());
-              localPlan.getAliasToWork().putAll(smbLocalWork.getAliasToWork());
-            }
+  private static void setupBucketMapJoinInfo(MapWork plan, SMBMapJoinOperator 
currMapJoinOp) {
+    if (currMapJoinOp != null) {
+      Map<String, Map<String, List<String>>> aliasBucketFileNameMapping =
+        currMapJoinOp.getConf().getAliasBucketFileNameMapping();
+      if (aliasBucketFileNameMapping != null) {
+        MapredLocalWork localPlan = plan.getMapRedLocalWork();
+        if (localPlan == null) {
+          localPlan = currMapJoinOp.getConf().getLocalWork();
+        } else {
+          // local plan is not null, we want to merge it into 
SMBMapJoinOperator's local work
+          MapredLocalWork smbLocalWork = 
currMapJoinOp.getConf().getLocalWork();
+          if (smbLocalWork != null) {
+            
localPlan.getAliasToFetchWork().putAll(smbLocalWork.getAliasToFetchWork());
+            localPlan.getAliasToWork().putAll(smbLocalWork.getAliasToWork());
           }
+        }
 
-          if (localPlan == null) {
-            return;
-          }
-          plan.setMapRedLocalWork(null);
-          currMapJoinOp.getConf().setLocalWork(localPlan);
+        if (localPlan == null) {
+          return;
+        }
+        plan.setMapRedLocalWork(null);
+        currMapJoinOp.getConf().setLocalWork(localPlan);
 
-          BucketMapJoinContext bucketMJCxt = new BucketMapJoinContext();
-          localPlan.setBucketMapjoinContext(bucketMJCxt);
-          
bucketMJCxt.setAliasBucketFileNameMapping(aliasBucketFileNameMapping);
-          bucketMJCxt.setBucketFileNameMapping(
-              currMapJoinOp.getConf().getBigTableBucketNumMapping());
-          localPlan.setInputFileChangeSensitive(true);
-          
bucketMJCxt.setMapJoinBigTableAlias(currMapJoinOp.getConf().getBigTableAlias());
-          bucketMJCxt
-              
.setBucketMatcherClass(org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class);
-          bucketMJCxt.setBigTablePartSpecToFileMapping(
-              currMapJoinOp.getConf().getBigTablePartSpecToFileMapping());
+        BucketMapJoinContext bucketMJCxt = new BucketMapJoinContext();
+        localPlan.setBucketMapjoinContext(bucketMJCxt);
+        bucketMJCxt.setAliasBucketFileNameMapping(aliasBucketFileNameMapping);
+        bucketMJCxt.setBucketFileNameMapping(
+          currMapJoinOp.getConf().getBigTableBucketNumMapping());
+        localPlan.setInputFileChangeSensitive(true);
+        
bucketMJCxt.setMapJoinBigTableAlias(currMapJoinOp.getConf().getBigTableAlias());
+        bucketMJCxt
+          
.setBucketMatcherClass(org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class);
+        bucketMJCxt.setBigTablePartSpecToFileMapping(
+          currMapJoinOp.getConf().getBigTablePartSpecToFileMapping());
 
-          plan.setUseBucketizedHiveInputFormat(true);
+        plan.setUseBucketizedHiveInputFormat(true);
 
-        }
       }
     }
+  }
 
-    /**
-     * Initialize the mapWork.
-     *
-     * @param opProcCtx
-     *          processing context
-     */
-    private static void initSMBJoinPlan(MapWork mapWork,
-                                        GenSparkProcContext opProcCtx, boolean 
local)
-            throws SemanticException {
-      TableScanOperator ts = (TableScanOperator) opProcCtx.currentRootOperator;
-      String currAliasId = findAliasId(opProcCtx, ts);
-      GenMapRedUtils.setMapWork(mapWork, opProcCtx.parseContext,
-         opProcCtx.inputs, null, ts, currAliasId, opProcCtx.conf, local);
-    }
+  private static void initSMBJoinPlan(GenSparkProcContext opProcCtx,
+    MapWork mapWork, TableScanOperator currentRootOperator, boolean local)
+    throws SemanticException {
+    String currAliasId = findAliasId(opProcCtx, currentRootOperator);
+    GenMapRedUtils.setMapWork(mapWork, opProcCtx.parseContext,
+      opProcCtx.inputs, null, currentRootOperator, currAliasId, 
opProcCtx.conf, local);
+  }
 
-    private static String findAliasId(GenSparkProcContext opProcCtx, 
TableScanOperator ts) {
-      for (String alias : opProcCtx.topOps.keySet()) {
-        if (opProcCtx.topOps.get(alias) == ts) {
-          return alias;
-        }
+  private static String findAliasId(GenSparkProcContext opProcCtx, 
TableScanOperator ts) {
+    for (String alias : opProcCtx.topOps.keySet()) {
+      if (opProcCtx.topOps.get(alias) == ts) {
+        return alias;
       }
-      return null;
     }
-
-    /**
-     * 1. Initializes the MapWork's aliasToWork, pointing to big-table's TS.
-     * 2. Adds the bucketing information to the MapWork.
-     * 3. Adds localwork to the MapWork, with localWork's aliasToWork pointing 
to small-table's TS.
-     */
-    @Override
-    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
-        Object... nodeOutputs) throws SemanticException {
-      SMBMapJoinOperator mapJoin = (SMBMapJoinOperator) nd;
-      GenSparkProcContext ctx = (GenSparkProcContext) procCtx;
-
-      // find the branch on which this processor was invoked
-      int pos = getPositionParent(mapJoin, stack);
-      boolean local = pos != mapJoin.getConf().getPosBigTable();
-
-      MapWork mapWork = ctx.smbJoinWorkMap.get(mapJoin);
-      initSMBJoinPlan(mapWork, ctx, local);
-
-      // find the associated mapWork that contains this processor.
-      setupBucketMapJoinInfo(mapWork, mapJoin);
-
-      // local aliases need not to hand over context further
-      return false;
-    }
-  }
-
-  public static NodeProcessor getTableScanMapJoin() {
-    return new SortMergeJoinProcessor();
+    return null;
   }
 }

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java 
Wed Feb  4 21:32:34 2015
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.exec.Ta
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
+import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -280,6 +281,10 @@ public abstract class TaskCompiler {
       for (ExecDriver tsk : mrTasks) {
         tsk.setRetryCmdWhenFail(true);
       }
+      List<SparkTask> sparkTasks = Utilities.getSparkTasks(rootTasks);
+      for (SparkTask sparkTask : sparkTasks) {
+        sparkTask.setRetryCmdWhenFail(true);
+      }
     }
 
     Interner<TableDesc> interner = Interners.newStrongInterner();

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
 (original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
 Wed Feb  4 21:32:34 2015
@@ -36,7 +36,6 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
-import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
@@ -44,6 +43,7 @@ import org.apache.hadoop.hive.ql.plan.Sp
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 
 import java.io.Serializable;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
@@ -103,8 +103,8 @@ public class GenSparkProcContext impleme
   // map that says which mapjoin belongs to which work item
   public final Map<MapJoinOperator, List<BaseWork>> mapJoinWorkMap;
 
-  // a map to keep track of which MapWork item holds which SMBMapJoinOp
-  public final Map<SMBMapJoinOperator, MapWork> smbJoinWorkMap;
+  // Map to keep track of which SMB Join operators and their information to 
annotate their MapWork with.
+  public final Map<SMBMapJoinOperator, SparkSMBMapJoinInfo> smbMapJoinCtxMap;
 
   // a map to keep track of which root generated which work
   public final Map<Operator<?>, BaseWork> rootToWorkMap;
@@ -160,7 +160,7 @@ public class GenSparkProcContext impleme
         new LinkedHashMap<ReduceSinkOperator, ObjectPair<SparkEdgeProperty, 
ReduceWork>>();
     this.linkOpWithWorkMap = new LinkedHashMap<Operator<?>, Map<BaseWork, 
SparkEdgeProperty>>();
     this.linkWorkWithReduceSinkMap = new LinkedHashMap<BaseWork, 
List<ReduceSinkOperator>>();
-    this.smbJoinWorkMap = new LinkedHashMap<SMBMapJoinOperator, MapWork>();
+    this.smbMapJoinCtxMap = new HashMap<SMBMapJoinOperator, 
SparkSMBMapJoinInfo>();
     this.mapJoinWorkMap = new LinkedHashMap<MapJoinOperator, List<BaseWork>>();
     this.rootToWorkMap = new LinkedHashMap<Operator<?>, BaseWork>();
     this.childToWorkMap = new LinkedHashMap<Operator<?>, List<BaseWork>>();

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java 
(original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java 
Wed Feb  4 21:32:34 2015
@@ -41,10 +41,12 @@ import org.apache.hadoop.hive.ql.exec.Ha
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
+import org.apache.hadoop.hive.ql.optimizer.spark.SparkSortMergeJoinFactory;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -443,6 +445,25 @@ public class GenSparkUtils {
     return null;
   }
 
+  /**
+   * Fill MapWork with 'local' work and bucket information for SMB Join.
+   * @param context context, containing references to MapWorks and their SMB 
information.
+   * @throws SemanticException
+   */
+  public void annotateMapWork(GenSparkProcContext context) throws 
SemanticException {
+    for (SMBMapJoinOperator smbMapJoinOp : context.smbMapJoinCtxMap.keySet()) {
+      //initialize mapwork with smbMapJoin information.
+      SparkSMBMapJoinInfo smbMapJoinInfo = 
context.smbMapJoinCtxMap.get(smbMapJoinOp);
+      MapWork work = smbMapJoinInfo.mapWork;
+      SparkSortMergeJoinFactory.annotateMapWork(context, work, smbMapJoinOp,
+        (TableScanOperator) smbMapJoinInfo.bigTableRootOp, false);
+      for (Operator<?> smallTableRootOp : smbMapJoinInfo.smallTableRootOps) {
+        SparkSortMergeJoinFactory.annotateMapWork(context, work, smbMapJoinOp,
+          (TableScanOperator) smallTableRootOp, true);
+      }
+    }
+  }
+
   public synchronized int getNextSeqNumber() {
     return ++sequenceNumber;
   }

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java 
(original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java 
Wed Feb  4 21:32:34 2015
@@ -34,10 +34,12 @@ import org.apache.hadoop.hive.ql.exec.Op
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
+import org.apache.hadoop.hive.ql.optimizer.spark.SparkSortMergeJoinFactory;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -118,18 +120,12 @@ public class GenSparkWork implements Nod
     } else {
       // create a new vertex
       if (context.preceedingWork == null) {
-        if (smbOp != null) {
-          // This logic is for SortMergeBucket MapJoin case.
-          // This MapWork (of big-table, see above..) is later initialized by 
SparkMapJoinFactory
-          // processor, so don't initialize it here. Just keep track of it in 
the context,
-          // for later processing.
-          work = utils.createMapWork(context, root, sparkWork, null, true);
-          if (context.smbJoinWorkMap.get(smbOp) != null) {
-            throw new SemanticException("Each SMBMapJoin should be associated 
only with one Mapwork");
-          }
-          context.smbJoinWorkMap.put(smbOp, (MapWork) work);
-        } else {
+        if (smbOp == null) {
           work = utils.createMapWork(context, root, sparkWork, null);
+        } else {
+          //save work to be initialized later with SMB information.
+          work = utils.createMapWork(context, root, sparkWork, null, true);
+          context.smbMapJoinCtxMap.get(smbOp).mapWork = (MapWork) work;
         }
       } else {
         work = utils.createReduceWork(context, root, sparkWork);

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java 
(original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java 
Wed Feb  4 21:32:34 2015
@@ -186,17 +186,30 @@ public class SparkCompiler extends TaskC
      *
      * Some of the other processors are expecting only one traversal beyond 
SMBJoinOp.
      * We need to traverse from the big-table path only, and stop traversing 
on the small-table path once we reach SMBJoinOp.
+     * Also add some SMB join information to the context, so we can properly 
annotate the MapWork later on.
      */
     opRules.put(new TypeRule(SMBMapJoinOperator.class),
       new NodeProcessor() {
         @Override
         public Object process(Node currNode, Stack<Node> stack,
                               NodeProcessorCtx procCtx, Object... os) throws 
SemanticException {
+          GenSparkProcContext context = (GenSparkProcContext) procCtx;
+          SMBMapJoinOperator currSmbNode = (SMBMapJoinOperator) currNode;
+          SparkSMBMapJoinInfo smbMapJoinCtx = 
context.smbMapJoinCtxMap.get(currSmbNode);
+          if (smbMapJoinCtx == null) {
+            smbMapJoinCtx = new SparkSMBMapJoinInfo();
+            context.smbMapJoinCtxMap.put(currSmbNode, smbMapJoinCtx);
+          }
+
           for (Node stackNode : stack) {
             if (stackNode instanceof DummyStoreOperator) {
+              //If coming from small-table side, do some book-keeping, and 
skip traversal.
+              smbMapJoinCtx.smallTableRootOps.add(context.currentRootOperator);
               return true;
             }
           }
+          //If coming from big-table side, do some book-keeping, and continue 
traversal
+          smbMapJoinCtx.bigTableRootOp = context.currentRootOperator;
           return false;
         }
       }
@@ -210,24 +223,14 @@ public class SparkCompiler extends TaskC
     GraphWalker ogw = new GenSparkWorkWalker(disp, procCtx);
     ogw.startWalking(topNodes, null);
 
-
-    // ------------------- Second Pass -----------------------
-    // SMB Join optimizations to add the "localWork" and bucketing data 
structures to MapWork.
-    opRules.clear();
-    opRules.put(new TypeRule(SMBMapJoinOperator.class),
-       SparkSortMergeJoinFactory.getTableScanMapJoin());
-
-    disp = new DefaultRuleDispatcher(null, opRules, procCtx);
-    topNodes = new ArrayList<Node>();
-    topNodes.addAll(pCtx.getTopOps().values());
-    ogw = new GenSparkWorkWalker(disp, procCtx);
-    ogw.startWalking(topNodes, null);
-
     // we need to clone some operator plans and remove union operators still
     for (BaseWork w: procCtx.workWithUnionOperators) {
       GenSparkUtils.getUtils().removeUnionOperators(conf, procCtx, w);
     }
 
+    // we need to fill MapWork with 'local' work and bucket information for 
SMB Join.
+    GenSparkUtils.getUtils().annotateMapWork(procCtx);
+
     // finally make sure the file sink operators are set up right
     for (FileSinkOperator fileSink: procCtx.fileSinkSet) {
       GenSparkUtils.getUtils().processFileSink(procCtx, fileSink);

Modified: 
hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- 
hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
 (original)
+++ 
hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
 Wed Feb  4 21:32:34 2015
@@ -232,6 +232,12 @@ public class Hadoop20SShims extends Hado
     throw new IOException("Cannot run tez on current hadoop, Version: " + 
VersionInfo.getVersion());
   }
 
+  @Override
+  public MiniMrShim getMiniSparkCluster(Configuration conf, int 
numberOfTaskTrackers,
+    String nameNode, int numDir) throws IOException {
+    throw new IOException("Cannot run Spark on YARN on current Hadoop, 
Version: " + VersionInfo.getVersion());
+  }
+
   /**
    * Shim for MiniMrCluster
    */

Modified: 
hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- 
hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
 (original)
+++ 
hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
 Wed Feb  4 21:32:34 2015
@@ -415,6 +415,73 @@ public class Hadoop23Shims extends Hadoo
     }
   }
 
+  /**
+   * Returns a shim to wrap MiniSparkOnYARNCluster
+   */
+  @Override
+  public MiniMrShim getMiniSparkCluster(Configuration conf, int 
numberOfTaskTrackers,
+    String nameNode, int numDir) throws IOException {
+    return new MiniSparkShim(conf, numberOfTaskTrackers, nameNode, numDir);
+  }
+
+  /**
+   * Shim for MiniSparkOnYARNCluster
+   */
+  public class MiniSparkShim extends Hadoop23Shims.MiniMrShim {
+
+    private final MiniSparkOnYARNCluster mr;
+    private final Configuration conf;
+
+    public MiniSparkShim(Configuration conf, int numberOfTaskTrackers,
+      String nameNode, int numDir) throws IOException {
+
+      mr = new MiniSparkOnYARNCluster("sparkOnYarn");
+      conf.set("fs.defaultFS", nameNode);
+      mr.init(conf);
+      mr.start();
+      this.conf = mr.getConfig();
+    }
+
+    @Override
+    public int getJobTrackerPort() throws UnsupportedOperationException {
+      String address = conf.get("yarn.resourcemanager.address");
+      address = StringUtils.substringAfterLast(address, ":");
+
+      if (StringUtils.isBlank(address)) {
+        throw new IllegalArgumentException("Invalid YARN resource manager 
port.");
+      }
+
+      return Integer.parseInt(address);
+    }
+
+    @Override
+    public void shutdown() throws IOException {
+      mr.stop();
+    }
+
+    @Override
+    public void setupConfiguration(Configuration conf) {
+      Configuration config = mr.getConfig();
+      for (Map.Entry<String, String> pair : config) {
+        conf.set(pair.getKey(), pair.getValue());
+      }
+
+      Path jarPath = new Path("hdfs:///user/hive");
+      Path hdfsPath = new Path("hdfs:///user/");
+      try {
+        FileSystem fs = cluster.getFileSystem();
+        jarPath = fs.makeQualified(jarPath);
+        conf.set("hive.jar.directory", jarPath.toString());
+        fs.mkdirs(jarPath);
+        hdfsPath = fs.makeQualified(hdfsPath);
+        conf.set("hive.user.install.directory", hdfsPath.toString());
+        fs.mkdirs(hdfsPath);
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
   // Don't move this code to the parent class. There's a binary
   // incompatibility between hadoop 1 and 2 wrt MiniDFSCluster and we
   // need to have two different shim classes even though they are

Modified: 
hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- 
hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
 (original)
+++ 
hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
 Wed Feb  4 21:32:34 2015
@@ -93,7 +93,10 @@ public interface HadoopShims {
       String nameNode, int numDir) throws IOException;
 
   public MiniMrShim getMiniTezCluster(Configuration conf, int 
numberOfTaskTrackers,
-                                     String nameNode, int numDir) throws 
IOException;
+      String nameNode, int numDir) throws IOException;
+
+  public MiniMrShim getMiniSparkCluster(Configuration conf, int 
numberOfTaskTrackers,
+      String nameNode, int numDir) throws IOException;
 
   /**
    * Shim for MiniMrCluster

Modified: 
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- 
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
 (original)
+++ 
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
 Wed Feb  4 21:32:34 2015
@@ -53,4 +53,9 @@ public interface JobContext {
    */
   Map<String, List<JavaFutureAction<?>>> getMonitoredJobs();
 
+  /**
+   * Return all added jar path which added through AddJarJob.
+   */
+  List<String> getAddedJars();
+
 }

Modified: 
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- 
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
 (original)
+++ 
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
 Wed Feb  4 21:32:34 2015
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.hive.spark.counter.SparkCounters;
 
@@ -32,11 +33,13 @@ class JobContextImpl implements JobConte
   private final JavaSparkContext sc;
   private final ThreadLocal<MonitorCallback> monitorCb;
   private final Map<String, List<JavaFutureAction<?>>> monitoredJobs;
+  private final List<String> addedJars;
 
   public JobContextImpl(JavaSparkContext sc) {
     this.sc = sc;
     this.monitorCb = new ThreadLocal<MonitorCallback>();
     monitoredJobs = new ConcurrentHashMap<String, List<JavaFutureAction<?>>>();
+    addedJars = new CopyOnWriteArrayList<String>();
   }
 
 
@@ -57,6 +60,11 @@ class JobContextImpl implements JobConte
     return monitoredJobs;
   }
 
+  @Override
+  public List<String> getAddedJars() {
+    return addedJars;
+  }
+
   void setMonitorCb(MonitorCallback cb) {
     monitorCb.set(cb);
   }

Modified: 
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- 
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
 (original)
+++ 
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
 Wed Feb  4 21:32:34 2015
@@ -106,6 +106,8 @@ public class RemoteDriver {
         serverAddress = getArg(args, idx);
       } else if (key.equals("--remote-port")) {
         serverPort = Integer.parseInt(getArg(args, idx));
+      } else if (key.equals("--client-id")) {
+        conf.set(SparkClientFactory.CONF_CLIENT_ID, getArg(args, idx));
       } else if (key.equals("--secret")) {
         conf.set(SparkClientFactory.CONF_KEY_SECRET, getArg(args, idx));
       } else if (key.equals("--conf")) {
@@ -127,6 +129,8 @@ public class RemoteDriver {
       LOG.debug("Remote Driver configured with: " + e._1() + "=" + e._2());
     }
 
+    String clientId = mapConf.get(SparkClientFactory.CONF_CLIENT_ID);
+    Preconditions.checkArgument(clientId != null, "No client ID provided.");
     String secret = mapConf.get(SparkClientFactory.CONF_KEY_SECRET);
     Preconditions.checkArgument(secret != null, "No secret provided.");
 
@@ -140,8 +144,8 @@ public class RemoteDriver {
     this.protocol = new DriverProtocol();
 
     // The RPC library takes care of timing out this.
-    this.clientRpc = Rpc.createClient(mapConf, egroup, serverAddress, 
serverPort, secret, protocol)
-      .get();
+    this.clientRpc = Rpc.createClient(mapConf, egroup, serverAddress, 
serverPort,
+      clientId, secret, protocol).get();
     this.running = true;
 
     this.clientRpc.addListener(new Rpc.Listener() {
@@ -354,6 +358,11 @@ public class RemoteDriver {
         if (sparkCounters != null) {
           counters = sparkCounters.snapshot();
         }
+        // make sure job has really succeeded
+        // at this point, future.get shall not block us
+        for (JavaFutureAction<?> future : jobs) {
+          future.get();
+        }
         protocol.jobFinished(req.id, result, null, counters);
       } catch (Throwable t) {
         // Catch throwables in a best-effort to report job status back to the 
client. It's

Modified: 
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- 
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
 (original)
+++ 
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
 Wed Feb  4 21:32:34 2015
@@ -37,6 +37,9 @@ public final class SparkClientFactory {
   /** Used to run the driver in-process, mostly for testing. */
   static final String CONF_KEY_IN_PROCESS = 
"spark.client.do_not_use.run_driver_in_process";
 
+  /** Used by client and driver to share a client ID for establishing an RPC 
session. */
+  static final String CONF_CLIENT_ID = "spark.client.authentication.client_id";
+
   /** Used by client and driver to share a secret for establishing an RPC 
session. */
   static final String CONF_KEY_SECRET = "spark.client.authentication.secret";
 

Modified: 
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- 
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
 (original)
+++ 
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
 Wed Feb  4 21:32:34 2015
@@ -64,9 +64,11 @@ class SparkClientImpl implements SparkCl
 
   private static final long DEFAULT_SHUTDOWN_TIMEOUT = 10000; // In 
milliseconds
 
+  private static final String OSX_TEST_OPTS = "SPARK_OSX_TEST_OPTS";
   private static final String DRIVER_OPTS_KEY = 
"spark.driver.extraJavaOptions";
   private static final String EXECUTOR_OPTS_KEY = 
"spark.executor.extraJavaOptions";
   private static final String DRIVER_EXTRA_CLASSPATH = 
"spark.driver.extraClassPath";
+  private static final String EXECUTOR_EXTRA_CLASSPATH = 
"spark.executor.extraClassPath";
 
   private final Map<String, String> conf;
   private final HiveConf hiveConf;
@@ -83,13 +85,14 @@ class SparkClientImpl implements SparkCl
     this.childIdGenerator = new AtomicInteger();
     this.jobs = Maps.newConcurrentMap();
 
+    String clientId = UUID.randomUUID().toString();
     String secret = rpcServer.createSecret();
-    this.driverThread = startDriver(rpcServer, secret);
+    this.driverThread = startDriver(rpcServer, clientId, secret);
     this.protocol = new ClientProtocol();
 
     try {
       // The RPC server will take care of timeouts here.
-      this.driverRpc = rpcServer.registerClient(secret, protocol).get();
+      this.driverRpc = rpcServer.registerClient(clientId, secret, 
protocol).get();
     } catch (Exception e) {
       LOG.warn("Error while waiting for client to connect.", e);
       driverThread.interrupt();
@@ -173,7 +176,8 @@ class SparkClientImpl implements SparkCl
     protocol.cancel(jobId);
   }
 
-  private Thread startDriver(RpcServer rpcServer, final String secret) throws 
IOException {
+  private Thread startDriver(RpcServer rpcServer, final String clientId, final 
String secret)
+      throws IOException {
     Runnable runnable;
     final String serverAddress = rpcServer.getAddress();
     final String serverPort = String.valueOf(rpcServer.getPort());
@@ -189,6 +193,8 @@ class SparkClientImpl implements SparkCl
           args.add(serverAddress);
           args.add("--remote-port");
           args.add(serverPort);
+          args.add("--client-id");
+          args.add(clientId);
           args.add("--secret");
           args.add(secret);
 
@@ -219,10 +225,16 @@ class SparkClientImpl implements SparkCl
           sparkLogDir = sparkHome + "/logs/";
         }
       }
+
+      String osxTestOpts = "";
+      if 
(Strings.nullToEmpty(System.getProperty("os.name")).toLowerCase().contains("mac"))
 {
+        osxTestOpts = Strings.nullToEmpty(System.getenv(OSX_TEST_OPTS));
+      }
+
       String driverJavaOpts = Joiner.on(" ").skipNulls().join(
-          "-Dhive.spark.log.dir=" + sparkLogDir, conf.get(DRIVER_OPTS_KEY));
+          "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, 
conf.get(DRIVER_OPTS_KEY));
       String executorJavaOpts = Joiner.on(" ").skipNulls().join(
-          "-Dhive.spark.log.dir=" + sparkLogDir, conf.get(EXECUTOR_OPTS_KEY));
+          "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, 
conf.get(EXECUTOR_OPTS_KEY));
 
       // Create a file with all the job properties to be read by spark-submit. 
Change the
       // file's permissions so that only the owner can read it. This avoid 
having the
@@ -236,18 +248,30 @@ class SparkClientImpl implements SparkCl
       for (Map.Entry<String, String> e : conf.entrySet()) {
         allProps.put(e.getKey(), conf.get(e.getKey()));
       }
+      allProps.put(SparkClientFactory.CONF_CLIENT_ID, clientId);
       allProps.put(SparkClientFactory.CONF_KEY_SECRET, secret);
       allProps.put(DRIVER_OPTS_KEY, driverJavaOpts);
       allProps.put(EXECUTOR_OPTS_KEY, executorJavaOpts);
 
-      String hiveHadoopTestClasspath = 
Strings.nullToEmpty(System.getenv("HIVE_HADOOP_TEST_CLASSPATH"));
-      if (!hiveHadoopTestClasspath.isEmpty()) {
-        String extraClasspath = 
Strings.nullToEmpty((String)allProps.get(DRIVER_EXTRA_CLASSPATH));
-        if (extraClasspath.isEmpty()) {
-          allProps.put(DRIVER_EXTRA_CLASSPATH, hiveHadoopTestClasspath);
-        } else {
-          extraClasspath = extraClasspath.endsWith(File.pathSeparator) ? 
extraClasspath : extraClasspath + File.pathSeparator;
-          allProps.put(DRIVER_EXTRA_CLASSPATH, extraClasspath + 
hiveHadoopTestClasspath);
+      String isTesting = conf.get("spark.testing");
+      if (isTesting != null && isTesting.equalsIgnoreCase("true")) {
+        String hiveHadoopTestClasspath = 
Strings.nullToEmpty(System.getenv("HIVE_HADOOP_TEST_CLASSPATH"));
+        if (!hiveHadoopTestClasspath.isEmpty()) {
+          String extraDriverClasspath = 
Strings.nullToEmpty((String)allProps.get(DRIVER_EXTRA_CLASSPATH));
+          if (extraDriverClasspath.isEmpty()) {
+            allProps.put(DRIVER_EXTRA_CLASSPATH, hiveHadoopTestClasspath);
+          } else {
+            extraDriverClasspath = 
extraDriverClasspath.endsWith(File.pathSeparator) ? extraDriverClasspath : 
extraDriverClasspath + File.pathSeparator;
+            allProps.put(DRIVER_EXTRA_CLASSPATH, extraDriverClasspath + 
hiveHadoopTestClasspath);
+          }
+
+          String extraExecutorClasspath = 
Strings.nullToEmpty((String)allProps.get(EXECUTOR_EXTRA_CLASSPATH));
+          if (extraExecutorClasspath.isEmpty()) {
+            allProps.put(EXECUTOR_EXTRA_CLASSPATH, hiveHadoopTestClasspath);
+          } else {
+            extraExecutorClasspath = 
extraExecutorClasspath.endsWith(File.pathSeparator) ? extraExecutorClasspath : 
extraExecutorClasspath + File.pathSeparator;
+            allProps.put(EXECUTOR_EXTRA_CLASSPATH, extraExecutorClasspath + 
hiveHadoopTestClasspath);
+          }
         }
       }
 
@@ -350,6 +374,9 @@ class SparkClientImpl implements SparkCl
       LOG.debug("Running client driver with argv: {}", Joiner.on(" 
").join(argv));
 
       ProcessBuilder pb = new ProcessBuilder(argv.toArray(new 
String[argv.size()]));
+      if (isTesting != null) {
+        pb.environment().put("SPARK_TESTING", isTesting);
+      }
       final Process child = pb.start();
 
       int childId = childIdGenerator.incrementAndGet();
@@ -529,6 +556,9 @@ class SparkClientImpl implements SparkCl
     @Override
     public Serializable call(JobContext jc) throws Exception {
       jc.sc().addJar(path);
+      // Following remote job may refer to classes in this jar, and the remote 
job would be executed
+      // in a different thread, so we add this jar path to JobContext for 
further usage.
+      jc.getAddedJars().add(path);
       return null;
     }
 

Modified: 
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- 
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java
 (original)
+++ 
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java
 Wed Feb  4 21:32:34 2015
@@ -18,6 +18,7 @@
 package org.apache.hive.spark.client.rpc;
 
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
@@ -63,9 +64,12 @@ class KryoMessageCodec extends ByteToMes
     }
   };
 
+  private volatile EncryptionHandler encryptionHandler;
+
   public KryoMessageCodec(int maxMessageSize, Class<?>... messages) {
     this.maxMessageSize = maxMessageSize;
     this.messages = Arrays.asList(messages);
+    this.encryptionHandler = null;
   }
 
   @Override
@@ -86,7 +90,7 @@ class KryoMessageCodec extends ByteToMes
     }
 
     try {
-      ByteBuffer nioBuffer = in.nioBuffer(in.readerIndex(), msgSize);
+      ByteBuffer nioBuffer = maybeDecrypt(in.nioBuffer(in.readerIndex(), 
msgSize));
       Input kryoIn = new Input(new ByteBufferInputStream(nioBuffer));
 
       Object msg = kryos.get().readClassAndObject(kryoIn);
@@ -106,7 +110,7 @@ class KryoMessageCodec extends ByteToMes
     kryos.get().writeClassAndObject(kryoOut, msg);
     kryoOut.flush();
 
-    byte[] msgData = bytes.toByteArray();
+    byte[] msgData = maybeEncrypt(bytes.toByteArray());
     LOG.debug("Encoded message of type {} ({} bytes)", 
msg.getClass().getName(), msgData.length);
     checkSize(msgData.length);
 
@@ -115,10 +119,56 @@ class KryoMessageCodec extends ByteToMes
     buf.writeBytes(msgData);
   }
 
+  @Override
+  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    if (encryptionHandler != null) {
+      encryptionHandler.dispose();
+    }
+    super.channelInactive(ctx);
+  }
+
   private void checkSize(int msgSize) {
     Preconditions.checkArgument(msgSize > 0, "Message size (%s bytes) must be 
positive.", msgSize);
     Preconditions.checkArgument(maxMessageSize <= 0 || msgSize <= 
maxMessageSize,
         "Message (%s bytes) exceeds maximum allowed size (%s bytes).", 
msgSize, maxMessageSize);
   }
 
+  private byte[] maybeEncrypt(byte[] data) throws Exception {
+    return (encryptionHandler != null) ? encryptionHandler.wrap(data, 0, 
data.length) : data;
+  }
+
+  private ByteBuffer maybeDecrypt(ByteBuffer data) throws Exception {
+    if (encryptionHandler != null) {
+      byte[] encrypted;
+      int len = data.limit() - data.position();
+      int offset;
+      if (data.hasArray()) {
+        encrypted = data.array();
+        offset = data.position() + data.arrayOffset();
+        data.position(data.limit());
+      } else {
+        encrypted = new byte[len];
+        offset = 0;
+        data.get(encrypted);
+      }
+      return ByteBuffer.wrap(encryptionHandler.unwrap(encrypted, offset, len));
+    } else {
+      return data;
+    }
+  }
+
+  void setEncryptionHandler(EncryptionHandler handler) {
+    this.encryptionHandler = handler;
+  }
+
+  interface EncryptionHandler {
+
+    byte[] wrap(byte[] data, int offset, int len) throws IOException;
+
+    byte[] unwrap(byte[] data, int offset, int len) throws IOException;
+
+    void dispose() throws IOException;
+
+  }
+
 }

Modified: 
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/README.md
URL: 
http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/README.md?rev=1657405&r1=1657404&r2=1657405&view=diff
==============================================================================
--- 
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/README.md
 (original)
+++ 
hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/README.md
 Wed Feb  4 21:32:34 2015
@@ -6,7 +6,7 @@ Basic flow of events:
 - Client side creates an RPC server
 - Client side spawns RemoteDriver, which manages the SparkContext, and 
provides a secret
 - Client side sets up a timer to wait for RemoteDriver to connect back
-- RemoteDriver connects back to client, sends Hello message with secret
+- RemoteDriver connects back to client, SASL handshake ensues
 - Connection is established and now there's a session between the client and 
the driver.
 
 Features of the RPC layer:


Reply via email to