Author: xuefu Date: Wed Feb 4 21:32:34 2015 New Revision: 1657405 URL: http://svn.apache.org/r1657405 Log: HIVE-9572: Merge from Spark branch to trunk 02/03/2015 (reviewed by Brock)
Added: hive/trunk/data/conf/spark/standalone/ - copied from r1657401, hive/branches/spark/data/conf/spark/standalone/ hive/trunk/data/conf/spark/yarn-client/ - copied from r1657401, hive/branches/spark/data/conf/spark/yarn-client/ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkSMBMapJoinInfo.java - copied unchanged from r1657401, hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkSMBMapJoinInfo.java hive/trunk/ql/src/test/queries/clientpositive/lateral_view_explode2.q - copied unchanged from r1657401, hive/branches/spark/ql/src/test/queries/clientpositive/lateral_view_explode2.q hive/trunk/ql/src/test/results/clientpositive/lateral_view_explode2.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/lateral_view_explode2.q.out hive/trunk/ql/src/test/results/clientpositive/spark/bucket5.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/bucket5.q.out hive/trunk/ql/src/test/results/clientpositive/spark/bucket6.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/bucket6.q.out hive/trunk/ql/src/test/results/clientpositive/spark/bucketizedhiveinputformat.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/bucketizedhiveinputformat.q.out hive/trunk/ql/src/test/results/clientpositive/spark/constprog_partitioner.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/constprog_partitioner.q.out hive/trunk/ql/src/test/results/clientpositive/spark/empty_dir_in_table.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/empty_dir_in_table.q.out hive/trunk/ql/src/test/results/clientpositive/spark/external_table_with_space_in_location_path.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/external_table_with_space_in_location_path.q.out hive/trunk/ql/src/test/results/clientpositive/spark/file_with_header_footer.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/file_with_header_footer.q.out hive/trunk/ql/src/test/results/clientpositive/spark/import_exported_table.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/import_exported_table.q.out hive/trunk/ql/src/test/results/clientpositive/spark/index_bitmap3.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/index_bitmap3.q.out hive/trunk/ql/src/test/results/clientpositive/spark/index_bitmap_auto.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/index_bitmap_auto.q.out hive/trunk/ql/src/test/results/clientpositive/spark/infer_bucket_sort_bucketed_table.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/infer_bucket_sort_bucketed_table.q.out hive/trunk/ql/src/test/results/clientpositive/spark/infer_bucket_sort_map_operators.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/infer_bucket_sort_map_operators.q.out hive/trunk/ql/src/test/results/clientpositive/spark/infer_bucket_sort_merge.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/infer_bucket_sort_merge.q.out hive/trunk/ql/src/test/results/clientpositive/spark/infer_bucket_sort_num_buckets.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/infer_bucket_sort_num_buckets.q.out hive/trunk/ql/src/test/results/clientpositive/spark/infer_bucket_sort_reducers_power_two.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/infer_bucket_sort_reducers_power_two.q.out hive/trunk/ql/src/test/results/clientpositive/spark/input16_cc.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/input16_cc.q.out hive/trunk/ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out hive/trunk/ql/src/test/results/clientpositive/spark/list_bucket_dml_10.q.java1.7.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/list_bucket_dml_10.q.java1.7.out hive/trunk/ql/src/test/results/clientpositive/spark/load_fs2.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/load_fs2.q.out hive/trunk/ql/src/test/results/clientpositive/spark/load_hdfs_file_with_space_in_the_name.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/load_hdfs_file_with_space_in_the_name.q.out hive/trunk/ql/src/test/results/clientpositive/spark/parallel_orderby.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/parallel_orderby.q.out hive/trunk/ql/src/test/results/clientpositive/spark/ql_rewrite_gbtoidx.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/ql_rewrite_gbtoidx.q.out hive/trunk/ql/src/test/results/clientpositive/spark/ql_rewrite_gbtoidx_cbo_1.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/ql_rewrite_gbtoidx_cbo_1.q.out hive/trunk/ql/src/test/results/clientpositive/spark/quotedid_smb.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/quotedid_smb.q.out hive/trunk/ql/src/test/results/clientpositive/spark/reduce_deduplicate.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/reduce_deduplicate.q.out hive/trunk/ql/src/test/results/clientpositive/spark/remote_script.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/remote_script.q.out hive/trunk/ql/src/test/results/clientpositive/spark/root_dir_external_table.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/root_dir_external_table.q.out hive/trunk/ql/src/test/results/clientpositive/spark/schemeAuthority.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/schemeAuthority.q.out hive/trunk/ql/src/test/results/clientpositive/spark/schemeAuthority2.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/schemeAuthority2.q.out hive/trunk/ql/src/test/results/clientpositive/spark/temp_table_external.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/temp_table_external.q.out hive/trunk/ql/src/test/results/clientpositive/spark/truncate_column_buckets.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/truncate_column_buckets.q.out hive/trunk/ql/src/test/results/clientpositive/spark/uber_reduce.q.out - copied unchanged from r1657401, hive/branches/spark/ql/src/test/results/clientpositive/spark/uber_reduce.q.out hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/MiniSparkOnYARNCluster.java - copied unchanged from r1657401, hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/MiniSparkOnYARNCluster.java hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java - copied unchanged from r1657401, hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/SaslHandler.java - copied unchanged from r1657401, hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/SaslHandler.java Removed: hive/trunk/data/conf/spark/hive-site.xml Modified: hive/trunk/ (props changed) hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java hive/trunk/hbase-handler/pom.xml (props changed) hive/trunk/itests/pom.xml hive/trunk/itests/qtest-spark/pom.xml hive/trunk/itests/src/test/resources/testconfiguration.properties hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java hive/trunk/pom.xml hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/README.md hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java hive/trunk/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestKryoMessageCodec.java hive/trunk/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java Propchange: hive/trunk/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Feb 4 21:32:34 2015 @@ -1,5 +1,5 @@ /hive/branches/branch-0.11:1480385,1480458,1481120,1481344,1481346,1481348,1481352,1483872,1505184 /hive/branches/cbo:1605012-1627125 -/hive/branches/spark:1608589-1654414 +/hive/branches/spark:1608589-1657401 /hive/branches/tez:1494760-1622766 /hive/branches/vectorization:1466908-1527856 Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original) +++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Feb 4 21:32:34 2015 @@ -2030,7 +2030,9 @@ public class HiveConf extends Configurat SPARK_RPC_MAX_MESSAGE_SIZE("hive.spark.client.rpc.max.size", 50 * 1024 * 1024, "Maximum message size in bytes for communication between Hive client and remote Spark driver. Default is 50MB."), SPARK_RPC_CHANNEL_LOG_LEVEL("hive.spark.client.channel.log.level", null, - "Channel logging level for remote Spark driver. One of {DEBUG, ERROR, INFO, TRACE, WARN}."); + "Channel logging level for remote Spark driver. One of {DEBUG, ERROR, INFO, TRACE, WARN}."), + SPARK_RPC_SASL_MECHANISM("hive.spark.client.rpc.sasl.mechanisms", "DIGEST-MD5", + "Name of the SASL mechanism to use for authentication."); public final String varname; private final String defaultExpr; @@ -2270,10 +2272,33 @@ public class HiveConf extends Configurat throw new IllegalArgumentException("Cannot modify " + name + " at runtime. It is in the list" + "of parameters that can't be modified at runtime"); } - isSparkConfigUpdated = name.startsWith("spark"); + isSparkConfigUpdated = isSparkRelatedConfig(name); set(name, value); } + /** + * check whether spark related property is updated, which includes spark configurations, + * RSC configurations and yarn configuration in Spark on YARN mode. + * @param name + * @return + */ + private boolean isSparkRelatedConfig(String name) { + boolean result = false; + if (name.startsWith("spark")) { // Spark property. + result = true; + } else if (name.startsWith("yarn")) { // YARN property in Spark on YARN mode. + String sparkMaster = get("spark.master"); + if (sparkMaster != null && + (sparkMaster.equals("yarn-client") || sparkMaster.equals("yarn-cluster"))) { + result = true; + } + } else if (name.startsWith("hive.spark")) { // Remote Spark Context property. + result = true; + } + + return result; + } + public static int getIntVar(Configuration conf, ConfVars var) { assert (var.valClass == Integer.class) : var.varname; return conf.getInt(var.varname, var.defaultIntVal); Propchange: hive/trunk/hbase-handler/pom.xml ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Feb 4 21:32:34 2015 @@ -1,6 +1,6 @@ /hive/branches/branch-0.11/hbase-handler/pom.xml:1480385,1480458,1481120,1481344,1481346,1481348,1481352,1483872,1505184 /hive/branches/cbo/hbase-handler/pom.xml:1605012-1627125 -/hive/branches/spark/hbase-handler/pom.xml:1608589-1654414 +/hive/branches/spark/hbase-handler/pom.xml:1608589-1657401 /hive/branches/tez/hbase-handler/pom.xml:1494760-1622766 /hive/branches/vectorization/hbase-handler/pom.xml:1466908-1527856 /hive/trunk/hbase-handler/pom.xml:1494760-1537575 Modified: hive/trunk/itests/pom.xml URL: http://svn.apache.org/viewvc/hive/trunk/itests/pom.xml?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/itests/pom.xml (original) +++ hive/trunk/itests/pom.xml Wed Feb 4 21:32:34 2015 @@ -88,7 +88,7 @@ curl -Sso $DOWNLOAD_DIR/$tarName $url fi tar -zxf $DOWNLOAD_DIR/$tarName -C $BASE_DIR - mv $BASE_DIR/${finalName}* $BASE_DIR/$finalName + mv $BASE_DIR/spark-${spark.version}-bin-hadoop2-without-hive $BASE_DIR/$finalName } mkdir -p $DOWNLOAD_DIR download "http://d3jw87u4immizc.cloudfront.net/spark-tarball/spark-${spark.version}-bin-hadoop2-without-hive.tgz" "spark" Modified: hive/trunk/itests/qtest-spark/pom.xml URL: http://svn.apache.org/viewvc/hive/trunk/itests/qtest-spark/pom.xml?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/itests/qtest-spark/pom.xml (original) +++ hive/trunk/itests/qtest-spark/pom.xml Wed Feb 4 21:32:34 2015 @@ -37,7 +37,6 @@ <qfile></qfile> <qfile_regex></qfile_regex> <run_disabled>false</run_disabled> - <clustermode>spark</clustermode> <execute.beeline.tests>false</execute.beeline.tests> <active.hadoop.version>${hadoop-23.version}</active.hadoop.version> <test.dfs.mkdir>-mkdir -p</test.dfs.mkdir> @@ -333,22 +332,39 @@ <taskdef name="qtestgen" classname="org.apache.hadoop.hive.ant.QTestGenTask" classpath="${test.classpath}" /> <mkdir dir="${project.build.directory}/qfile-results/clientpositive/spark" /> + <mkdir dir="${project.build.directory}/qfile-results/clientpositive/miniSparkOnYarn" /> <qtestgen hiveRootDirectory="${basedir}/${hive.path.to.root}/" outputDirectory="${project.build.directory}/generated-test-sources/java/org/apache/hadoop/hive/cli/" templatePath="${basedir}/${hive.path.to.root}/ql/src/test/templates/" template="TestCliDriver.vm" queryDirectory="${basedir}/${hive.path.to.root}/ql/src/test/queries/clientpositive/" queryFile="${qfile}" queryFileRegex="${qfile_regex}" - clusterMode="${clustermode}" + clusterMode="spark" includeQueryFile="${spark.query.files}" runDisabled="${run_disabled}" - hiveConfDir="${basedir}/${hive.path.to.root}/data/conf/spark" + hiveConfDir="${basedir}/${hive.path.to.root}/data/conf/spark/standalone" resultsDirectory="${basedir}/${hive.path.to.root}/ql/src/test/results/clientpositive/spark" className="TestSparkCliDriver" logFile="${project.build.directory}/testsparkclidrivergen.log" logDirectory="${project.build.directory}/qfile-results/clientpositive/spark" initScript="q_test_init.sql" cleanupScript="q_test_cleanup.sql"/> + <qtestgen hiveRootDirectory="${basedir}/${hive.path.to.root}/" + outputDirectory="${project.build.directory}/generated-test-sources/java/org/apache/hadoop/hive/cli/" + templatePath="${basedir}/${hive.path.to.root}/ql/src/test/templates/" template="TestCliDriver.vm" + queryDirectory="${basedir}/${hive.path.to.root}/ql/src/test/queries/clientpositive/" + queryFile="${qfile}" + queryFileRegex="${qfile_regex}" + clusterMode="miniSparkOnYarn" + includeQueryFile="${miniSparkOnYarn.query.files}" + runDisabled="${run_disabled}" + hiveConfDir="${basedir}/${hive.path.to.root}/data/conf/spark/yarn-client" + resultsDirectory="${basedir}/${hive.path.to.root}/ql/src/test/results/clientpositive/spark" + className="TestMiniSparkOnYarnCliDriver" + logFile="${project.build.directory}/testminisparkonyarnclidrivergen.log" + logDirectory="${project.build.directory}/qfile-results/clientpositive/spark" + initScript="q_test_init.sql" + cleanupScript="q_test_cleanup.sql"/> </target> </configuration> <goals> Modified: hive/trunk/itests/src/test/resources/testconfiguration.properties URL: http://svn.apache.org/viewvc/hive/trunk/itests/src/test/resources/testconfiguration.properties?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/itests/src/test/resources/testconfiguration.properties (original) +++ hive/trunk/itests/src/test/resources/testconfiguration.properties Wed Feb 4 21:32:34 2015 @@ -715,6 +715,7 @@ spark.query.files=add_part_multiple.q, \ join_thrift.q, \ join_vc.q, \ join_view.q, \ + lateral_view_explode2.q, \ leftsemijoin.q, \ leftsemijoin_mr.q, \ limit_partition_metadataonly.q, \ @@ -1000,3 +1001,47 @@ spark.query.files=add_part_multiple.q, \ vectorized_string_funcs.q, \ vectorized_timestamp_funcs.q, \ windowing.q + +miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\ + bucket4.q,\ + bucket5.q,\ + bucket6.q,\ + bucketizedhiveinputformat.q,\ + bucketmapjoin6.q,\ + bucketmapjoin7.q,\ + constprog_partitioner.q,\ + disable_merge_for_bucketing.q,\ + empty_dir_in_table.q,\ + external_table_with_space_in_location_path.q,\ + file_with_header_footer.q,\ + import_exported_table.q,\ + index_bitmap3.q,\ + index_bitmap_auto.q,\ + infer_bucket_sort_bucketed_table.q,\ + infer_bucket_sort_map_operators.q,\ + infer_bucket_sort_merge.q,\ + infer_bucket_sort_num_buckets.q,\ + infer_bucket_sort_reducers_power_two.q,\ + input16_cc.q,\ + leftsemijoin_mr.q,\ + list_bucket_dml_10.q,\ + load_fs2.q,\ + load_hdfs_file_with_space_in_the_name.q,\ + optrstat_groupby.q,\ + parallel_orderby.q,\ + ql_rewrite_gbtoidx.q,\ + ql_rewrite_gbtoidx_cbo_1.q,\ + quotedid_smb.q,\ + reduce_deduplicate.q,\ + remote_script.q,\ + root_dir_external_table.q,\ + schemeAuthority.q,\ + schemeAuthority2.q,\ + scriptfile1.q,\ + scriptfile1_win.q,\ + smb_mapjoin_8.q,\ + stats_counter.q,\ + stats_counter_partitioned.q,\ + temp_table_external.q,\ + truncate_column_buckets.q,\ + uber_reduce.q Modified: hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java URL: http://svn.apache.org/viewvc/hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java (original) +++ hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java Wed Feb 4 21:32:34 2015 @@ -302,6 +302,7 @@ public class QTestUtil { tez, spark, encrypted, + miniSparkOnYarn, none; public static MiniClusterType valueForString(String type) { @@ -313,6 +314,8 @@ public class QTestUtil { return spark; } else if (type.equals("encrypted")) { return encrypted; + } else if (type.equals("miniSparkOnYarn")) { + return miniSparkOnYarn; } else { return none; } @@ -380,6 +383,8 @@ public class QTestUtil { String uriString = WindowsPathUtil.getHdfsUriString(fs.getUri().toString()); if (clusterType == MiniClusterType.tez) { mr = shims.getMiniTezCluster(conf, 4, uriString, 1); + } else if (clusterType == MiniClusterType.miniSparkOnYarn) { + mr = shims.getMiniSparkCluster(conf, 4, uriString, 1); } else { mr = shims.getMiniMrCluster(conf, 4, uriString, 1); } @@ -875,7 +880,8 @@ public class QTestUtil { ss.setIsSilent(true); SessionState oldSs = SessionState.get(); - if (oldSs != null && (clusterType == MiniClusterType.tez || clusterType == MiniClusterType.spark)) { + if (oldSs != null && (clusterType == MiniClusterType.tez || clusterType == MiniClusterType.spark + || clusterType == MiniClusterType.miniSparkOnYarn)) { sparkSession = oldSs.getSparkSession(); ss.setSparkSession(sparkSession); oldSs.setSparkSession(null); @@ -935,7 +941,8 @@ public class QTestUtil { ss.err = System.out; SessionState oldSs = SessionState.get(); - if (oldSs != null && (clusterType == MiniClusterType.tez || clusterType == MiniClusterType.spark)) { + if (oldSs != null && (clusterType == MiniClusterType.tez || clusterType == MiniClusterType.spark + || clusterType == MiniClusterType.miniSparkOnYarn)) { sparkSession = oldSs.getSparkSession(); ss.setSparkSession(sparkSession); oldSs.setSparkSession(null); Modified: hive/trunk/pom.xml URL: http://svn.apache.org/viewvc/hive/trunk/pom.xml?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/pom.xml (original) +++ hive/trunk/pom.xml Wed Feb 4 21:32:34 2015 @@ -825,6 +825,7 @@ <HADOOP_CLASSPATH>${test.tmp.dir}/conf:${basedir}/${hive.path.to.root}/conf</HADOOP_CLASSPATH> <HIVE_HADOOP_TEST_CLASSPATH>${test.hive.hadoop.classpath}</HIVE_HADOOP_TEST_CLASSPATH> <SPARK_SUBMIT_CLASSPATH>${spark.home}/lib/spark-assembly-${spark.version}-hadoop2.4.0.jar:${test.hive.hadoop.classpath}</SPARK_SUBMIT_CLASSPATH> + <SPARK_OSX_TEST_OPTS>-Dorg.xerial.snappy.tempdir=/tmp -Dorg.xerial.snappy.lib.name=libsnappyjava.jnilib</SPARK_OSX_TEST_OPTS> <PATH>${env.PATH}${test.extra.path}</PATH> </environmentVariables> <systemPropertyVariables> Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Wed Feb 4 21:32:34 2015 @@ -207,6 +207,7 @@ public final class Utilities { public static final String INPUT_NAME = "iocontext.input.name"; public static final String MAPRED_MAPPER_CLASS = "mapred.mapper.class"; public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class"; + public static final String HIVE_ADDED_JARS = "hive.added.jars"; /** * ReduceField: @@ -364,6 +365,18 @@ public final class Utilities { Path path = null; InputStream in = null; try { + String engine = HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE); + if (engine.equals("spark")) { + // TODO Add jar into current thread context classloader as it may be invoked by Spark driver inside + // threads, should be unnecessary while SPARK-5377 is resolved. + String addedJars = conf.get(HIVE_ADDED_JARS); + if (addedJars != null && !addedJars.isEmpty()) { + ClassLoader loader = Thread.currentThread().getContextClassLoader(); + ClassLoader newLoader = addToClassPath(loader, addedJars.split(";")); + Thread.currentThread().setContextClassLoader(newLoader); + } + } + path = getPlanPath(conf, name); LOG.info("PLAN PATH = " + path); assert path != null; Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java Wed Feb 4 21:32:34 2015 @@ -42,6 +42,7 @@ public class HiveSparkClientFactory { private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf"; private static final String SPARK_DEFAULT_MASTER = "local"; private static final String SPARK_DEFAULT_APP_NAME = "Hive on Spark"; + private static final String SPARK_DEFAULT_SERIALIZER = "org.apache.spark.serializer.KryoSerializer"; public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws IOException, SparkException { @@ -64,8 +65,7 @@ public class HiveSparkClientFactory { // set default spark configurations. sparkConf.put("spark.master", SPARK_DEFAULT_MASTER); sparkConf.put("spark.app.name", SPARK_DEFAULT_APP_NAME); - sparkConf.put("spark.serializer", - "org.apache.spark.serializer.KryoSerializer"); + sparkConf.put("spark.serializer", SPARK_DEFAULT_SERIALIZER); // load properties from spark-defaults.conf. InputStream inputStream = null; @@ -81,7 +81,7 @@ public class HiveSparkClientFactory { String value = properties.getProperty(propertyName); sparkConf.put(propertyName, properties.getProperty(propertyName)); LOG.info(String.format( - "load spark configuration from %s (%s -> %s).", + "load spark property from %s (%s -> %s).", SPARK_DEFAULT_CONF_FILE, propertyName, value)); } } @@ -99,22 +99,36 @@ public class HiveSparkClientFactory { } } - // load properties from hive configurations, including both spark.* properties - // and properties for remote driver RPC. + // load properties from hive configurations, including both spark.* properties, + // properties for remote driver RPC, and yarn properties for Spark on YARN mode. + String sparkMaster = hiveConf.get("spark.master"); + if (sparkMaster == null) { + sparkMaster = sparkConf.get("spark.master"); + } for (Map.Entry<String, String> entry : hiveConf) { String propertyName = entry.getKey(); if (propertyName.startsWith("spark")) { String value = hiveConf.get(propertyName); sparkConf.put(propertyName, value); LOG.info(String.format( - "load spark configuration from hive configuration (%s -> %s).", + "load spark property from hive configuration (%s -> %s).", propertyName, value)); + } else if (propertyName.startsWith("yarn") && + (sparkMaster.equals("yarn-client") || sparkMaster.equals("yarn-cluster"))) { + String value = hiveConf.get(propertyName); + // Add spark.hadoop prefix for yarn properties as SparkConf only accept properties + // started with spark prefix, Spark would remove spark.hadoop prefix lately and add + // it to its hadoop configuration. + sparkConf.put("spark.hadoop." + propertyName, value); + LOG.info(String.format( + "load yarn property from hive configuration in %s mode (%s -> %s).", + sparkMaster, propertyName, value)); } if (RpcConfiguration.HIVE_SPARK_RSC_CONFIGS.contains(propertyName)) { String value = RpcConfiguration.getValue(hiveConf, propertyName); sparkConf.put(propertyName, value); LOG.info(String.format( - "load RPC configuration from hive configuration (%s -> %s).", + "load RPC property from hive configuration (%s -> %s).", propertyName, value)); } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java Wed Feb 4 21:32:34 2015 @@ -17,15 +17,20 @@ */ package org.apache.hadoop.hive.ql.exec.spark; +import com.google.common.base.Splitter; +import com.google.common.base.Strings; + import java.io.IOException; import java.io.Serializable; import java.net.MalformedURLException; +import java.net.URL; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; @@ -48,15 +53,13 @@ import org.apache.hive.spark.client.JobC import org.apache.hive.spark.client.JobHandle; import org.apache.hive.spark.client.SparkClient; import org.apache.hive.spark.client.SparkClientFactory; +import org.apache.hive.spark.client.SparkClientUtilities; import org.apache.hive.spark.counter.SparkCounters; import org.apache.spark.SparkConf; import org.apache.spark.SparkException; import org.apache.spark.api.java.JavaFutureAction; import org.apache.spark.api.java.JavaPairRDD; -import com.google.common.base.Splitter; -import com.google.common.base.Strings; - /** * RemoteSparkClient is a wrapper of {@link org.apache.hive.spark.client.SparkClient}, which * wrap a spark job request and send to an remote SparkContext. @@ -74,8 +77,8 @@ public class RemoteHiveSparkClient imple private transient SparkConf sparkConf; private transient HiveConf hiveConf; - private transient List<String> localJars = new ArrayList<String>(); - private transient List<String> localFiles = new ArrayList<String>(); + private transient List<URL> localJars = new ArrayList<URL>(); + private transient List<URL> localFiles = new ArrayList<URL>(); private final transient long sparkClientTimtout; @@ -159,26 +162,28 @@ public class RemoteHiveSparkClient imple private void addResources(String addedFiles) { for (String addedFile : CSV_SPLITTER.split(Strings.nullToEmpty(addedFiles))) { - if (!localFiles.contains(addedFile)) { - localFiles.add(addedFile); - try { - remoteClient.addFile(SparkUtilities.getURL(addedFile)); - } catch (MalformedURLException e) { - LOG.warn("Failed to add file:" + addedFile); + try { + URL fileUrl = SparkUtilities.getURL(addedFile); + if (fileUrl != null && !localFiles.contains(fileUrl)) { + localFiles.add(fileUrl); + remoteClient.addFile(fileUrl); } + } catch (MalformedURLException e) { + LOG.warn("Failed to add file:" + addedFile); } } } private void addJars(String addedJars) { for (String addedJar : CSV_SPLITTER.split(Strings.nullToEmpty(addedJars))) { - if (!localJars.contains(addedJar)) { - localJars.add(addedJar); - try { - remoteClient.addJar(SparkUtilities.getURL(addedJar)); - } catch (MalformedURLException e) { - LOG.warn("Failed to add jar:" + addedJar); + try { + URL jarUrl = SparkUtilities.getURL(addedJar); + if (jarUrl != null && !localJars.contains(jarUrl)) { + localJars.add(jarUrl); + remoteClient.addJar(jarUrl); } + } catch (MalformedURLException e) { + LOG.warn("Failed to add jar:" + addedJar); } } } @@ -208,6 +213,15 @@ public class RemoteHiveSparkClient imple @Override public Serializable call(JobContext jc) throws Exception { JobConf localJobConf = KryoSerializer.deserializeJobConf(jobConfBytes); + + // Add jar to current thread class loader dynamically, and add jar paths to JobConf as Spark + // may need to load classes from this jar in other threads. + List<String> addedJars = jc.getAddedJars(); + if (addedJars != null && !addedJars.isEmpty()) { + SparkClientUtilities.addToClassPath(addedJars.toArray(new String[addedJars.size()])); + localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars, ";")); + } + Path localScratchDir = KryoSerializer.deserialize(scratchDirBytes, Path.class); SparkWork localSparkWork = KryoSerializer.deserialize(sparkWorkBytes, SparkWork.class); @@ -234,7 +248,6 @@ public class RemoteHiveSparkClient imple jc.monitor(future, sparkCounters, plan.getCachedRDDIds()); return null; } - } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Wed Feb 4 21:32:34 2015 @@ -233,6 +233,7 @@ public class SparkPlanGenerator { throw new IllegalArgumentException(msg, e); } if (work instanceof MapWork) { + cloned.setBoolean("mapred.task.is.map", true); List<Path> inputPaths = Utilities.getInputPaths(cloned, (MapWork) work, scratchDir, context, false); Utilities.setInputPaths(cloned, inputPaths); @@ -250,6 +251,7 @@ public class SparkPlanGenerator { // remember the JobConf cloned for each MapWork, so we won't clone for it again workToJobConf.put(work, cloned); } else if (work instanceof ReduceWork) { + cloned.setBoolean("mapred.task.is.map", false); Utilities.setReduceWork(cloned, (ReduceWork) work, scratchDir, false); Utilities.createTmpDirs(cloned, (ReduceWork) work); cloned.set(Utilities.MAPRED_REDUCER_CLASS, ExecReducer.class.getName()); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java Wed Feb 4 21:32:34 2015 @@ -22,6 +22,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; @@ -43,6 +45,7 @@ import com.google.common.collect.Maps; public class LocalSparkJobStatus implements SparkJobStatus { private final JavaSparkContext sparkContext; + private static final Log LOG = LogFactory.getLog(LocalSparkJobStatus.class.getName()); private int jobId; // After SPARK-2321, we only use JobMetricsListener to get job metrics // TODO: remove it when the new API provides equivalent functionality @@ -69,16 +72,20 @@ public class LocalSparkJobStatus impleme @Override public JobExecutionStatus getState() { + SparkJobInfo sparkJobInfo = getJobInfo(); // For spark job with empty source data, it's not submitted actually, so we would never // receive JobStart/JobEnd event in JobStateListener, use JavaFutureAction to get current // job state. - if (future.isDone()) { + if (sparkJobInfo == null && future.isDone()) { + try { + future.get(); + } catch (Exception e) { + LOG.error("Failed to run job " + jobId, e); + return JobExecutionStatus.FAILED; + } return JobExecutionStatus.SUCCEEDED; - } else { - // SparkJobInfo may not be available yet - SparkJobInfo sparkJobInfo = getJobInfo(); - return sparkJobInfo == null ? null : sparkJobInfo.status(); } + return sparkJobInfo == null ? null : sparkJobInfo.status(); } @Override Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java Wed Feb 4 21:32:34 2015 @@ -179,7 +179,15 @@ public class RemoteSparkJobStatus implem if (list != null && list.size() == 1) { JavaFutureAction<?> futureAction = list.get(0); if (futureAction.isDone()) { - jobInfo = getDefaultJobInfo(sparkJobId, JobExecutionStatus.SUCCEEDED); + boolean futureSucceed = true; + try { + futureAction.get(); + } catch (Exception e) { + LOG.error("Failed to run job " + sparkJobId, e); + futureSucceed = false; + } + jobInfo = getDefaultJobInfo(sparkJobId, + futureSucceed ? JobExecutionStatus.SUCCEEDED : JobExecutionStatus.FAILED); } } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java Wed Feb 4 21:32:34 2015 @@ -316,7 +316,9 @@ public class Vectorizer implements Physi for (BaseWork baseWork : sparkWork.getAllWork()) { if (baseWork instanceof MapWork) { convertMapWork((MapWork) baseWork, false); - } else if (baseWork instanceof ReduceWork) { + } else if (baseWork instanceof ReduceWork + && HiveConf.getBoolVar(pctx.getConf(), + HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_ENABLED)) { convertReduceWork((ReduceWork) baseWork); } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java Wed Feb 4 21:32:34 2015 @@ -1,20 +1,20 @@ /** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hive.ql.optimizer.spark; import java.util.List; @@ -36,8 +36,8 @@ import org.apache.hadoop.hive.ql.plan.Ma import org.apache.hadoop.hive.ql.plan.OperatorDesc; /** -* Operator factory for Spark SMBJoin processing. -*/ + * Operator factory for Spark SMBJoin processing. + */ public final class SparkSortMergeJoinFactory { private SparkSortMergeJoinFactory() { @@ -45,131 +45,79 @@ public final class SparkSortMergeJoinFac } /** - * Get the branch on which we are invoked (walking) from. See diagram below. - * We are at the SMBJoinOp and could have come from TS of any of the input tables. - */ - public static int getPositionParent(SMBMapJoinOperator op, - Stack<Node> stack) { - int size = stack.size(); - assert size >= 2 && stack.get(size - 1) == op; - @SuppressWarnings("unchecked") - Operator<? extends OperatorDesc> parent = - (Operator<? extends OperatorDesc>) stack.get(size - 2); - List<Operator<? extends OperatorDesc>> parOp = op.getParentOperators(); - int pos = parOp.indexOf(parent); - return pos; - } - - /** - * SortMergeMapJoin processor, input is a SMBJoinOp that is part of a MapWork: - * - * MapWork: - * - * (Big) (Small) (Small) - * TS TS TS - * \ | / - * \ DS DS - * \ | / - * SMBJoinOP + * Annotate MapWork, input is a SMBJoinOp that is part of a MapWork, and its root TS operator. * * 1. Initializes the MapWork's aliasToWork, pointing to big-table's TS. * 2. Adds the bucketing information to the MapWork. * 3. Adds localwork to the MapWork, with localWork's aliasToWork pointing to small-table's TS. + * @param context proc walker context + * @param mapWork mapwork to annotate + * @param smbMapJoinOp SMB Map Join operator to get data + * @param ts Table Scan operator to get data + * @param local Whether ts is from a 'local' source (small-table that will be loaded by SMBJoin 'local' task) */ - private static class SortMergeJoinProcessor implements NodeProcessor { + public static void annotateMapWork(GenSparkProcContext context, MapWork mapWork, + SMBMapJoinOperator smbMapJoinOp, TableScanOperator ts, boolean local) + throws SemanticException { + initSMBJoinPlan(context, mapWork, ts, local); + setupBucketMapJoinInfo(mapWork, smbMapJoinOp); + } - public static void setupBucketMapJoinInfo(MapWork plan, SMBMapJoinOperator currMapJoinOp) { - if (currMapJoinOp != null) { - Map<String, Map<String, List<String>>> aliasBucketFileNameMapping = - currMapJoinOp.getConf().getAliasBucketFileNameMapping(); - if (aliasBucketFileNameMapping != null) { - MapredLocalWork localPlan = plan.getMapRedLocalWork(); - if (localPlan == null) { - localPlan = currMapJoinOp.getConf().getLocalWork(); - } else { - // local plan is not null, we want to merge it into SMBMapJoinOperator's local work - MapredLocalWork smbLocalWork = currMapJoinOp.getConf().getLocalWork(); - if (smbLocalWork != null) { - localPlan.getAliasToFetchWork().putAll(smbLocalWork.getAliasToFetchWork()); - localPlan.getAliasToWork().putAll(smbLocalWork.getAliasToWork()); - } + private static void setupBucketMapJoinInfo(MapWork plan, SMBMapJoinOperator currMapJoinOp) { + if (currMapJoinOp != null) { + Map<String, Map<String, List<String>>> aliasBucketFileNameMapping = + currMapJoinOp.getConf().getAliasBucketFileNameMapping(); + if (aliasBucketFileNameMapping != null) { + MapredLocalWork localPlan = plan.getMapRedLocalWork(); + if (localPlan == null) { + localPlan = currMapJoinOp.getConf().getLocalWork(); + } else { + // local plan is not null, we want to merge it into SMBMapJoinOperator's local work + MapredLocalWork smbLocalWork = currMapJoinOp.getConf().getLocalWork(); + if (smbLocalWork != null) { + localPlan.getAliasToFetchWork().putAll(smbLocalWork.getAliasToFetchWork()); + localPlan.getAliasToWork().putAll(smbLocalWork.getAliasToWork()); } + } - if (localPlan == null) { - return; - } - plan.setMapRedLocalWork(null); - currMapJoinOp.getConf().setLocalWork(localPlan); + if (localPlan == null) { + return; + } + plan.setMapRedLocalWork(null); + currMapJoinOp.getConf().setLocalWork(localPlan); - BucketMapJoinContext bucketMJCxt = new BucketMapJoinContext(); - localPlan.setBucketMapjoinContext(bucketMJCxt); - bucketMJCxt.setAliasBucketFileNameMapping(aliasBucketFileNameMapping); - bucketMJCxt.setBucketFileNameMapping( - currMapJoinOp.getConf().getBigTableBucketNumMapping()); - localPlan.setInputFileChangeSensitive(true); - bucketMJCxt.setMapJoinBigTableAlias(currMapJoinOp.getConf().getBigTableAlias()); - bucketMJCxt - .setBucketMatcherClass(org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class); - bucketMJCxt.setBigTablePartSpecToFileMapping( - currMapJoinOp.getConf().getBigTablePartSpecToFileMapping()); + BucketMapJoinContext bucketMJCxt = new BucketMapJoinContext(); + localPlan.setBucketMapjoinContext(bucketMJCxt); + bucketMJCxt.setAliasBucketFileNameMapping(aliasBucketFileNameMapping); + bucketMJCxt.setBucketFileNameMapping( + currMapJoinOp.getConf().getBigTableBucketNumMapping()); + localPlan.setInputFileChangeSensitive(true); + bucketMJCxt.setMapJoinBigTableAlias(currMapJoinOp.getConf().getBigTableAlias()); + bucketMJCxt + .setBucketMatcherClass(org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class); + bucketMJCxt.setBigTablePartSpecToFileMapping( + currMapJoinOp.getConf().getBigTablePartSpecToFileMapping()); - plan.setUseBucketizedHiveInputFormat(true); + plan.setUseBucketizedHiveInputFormat(true); - } } } + } - /** - * Initialize the mapWork. - * - * @param opProcCtx - * processing context - */ - private static void initSMBJoinPlan(MapWork mapWork, - GenSparkProcContext opProcCtx, boolean local) - throws SemanticException { - TableScanOperator ts = (TableScanOperator) opProcCtx.currentRootOperator; - String currAliasId = findAliasId(opProcCtx, ts); - GenMapRedUtils.setMapWork(mapWork, opProcCtx.parseContext, - opProcCtx.inputs, null, ts, currAliasId, opProcCtx.conf, local); - } + private static void initSMBJoinPlan(GenSparkProcContext opProcCtx, + MapWork mapWork, TableScanOperator currentRootOperator, boolean local) + throws SemanticException { + String currAliasId = findAliasId(opProcCtx, currentRootOperator); + GenMapRedUtils.setMapWork(mapWork, opProcCtx.parseContext, + opProcCtx.inputs, null, currentRootOperator, currAliasId, opProcCtx.conf, local); + } - private static String findAliasId(GenSparkProcContext opProcCtx, TableScanOperator ts) { - for (String alias : opProcCtx.topOps.keySet()) { - if (opProcCtx.topOps.get(alias) == ts) { - return alias; - } + private static String findAliasId(GenSparkProcContext opProcCtx, TableScanOperator ts) { + for (String alias : opProcCtx.topOps.keySet()) { + if (opProcCtx.topOps.get(alias) == ts) { + return alias; } - return null; } - - /** - * 1. Initializes the MapWork's aliasToWork, pointing to big-table's TS. - * 2. Adds the bucketing information to the MapWork. - * 3. Adds localwork to the MapWork, with localWork's aliasToWork pointing to small-table's TS. - */ - @Override - public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, - Object... nodeOutputs) throws SemanticException { - SMBMapJoinOperator mapJoin = (SMBMapJoinOperator) nd; - GenSparkProcContext ctx = (GenSparkProcContext) procCtx; - - // find the branch on which this processor was invoked - int pos = getPositionParent(mapJoin, stack); - boolean local = pos != mapJoin.getConf().getPosBigTable(); - - MapWork mapWork = ctx.smbJoinWorkMap.get(mapJoin); - initSMBJoinPlan(mapWork, ctx, local); - - // find the associated mapWork that contains this processor. - setupBucketMapJoinInfo(mapWork, mapJoin); - - // local aliases need not to hand over context further - return false; - } - } - - public static NodeProcessor getTableScanMapJoin() { - return new SortMergeJoinProcessor(); + return null; } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java Wed Feb 4 21:32:34 2015 @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.exec.Ta import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecDriver; +import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -280,6 +281,10 @@ public abstract class TaskCompiler { for (ExecDriver tsk : mrTasks) { tsk.setRetryCmdWhenFail(true); } + List<SparkTask> sparkTasks = Utilities.getSparkTasks(rootTasks); + for (SparkTask sparkTask : sparkTasks) { + sparkTask.setRetryCmdWhenFail(true); + } } Interner<TableDesc> interner = Interners.newStrongInterner(); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java Wed Feb 4 21:32:34 2015 @@ -36,7 +36,6 @@ import org.apache.hadoop.hive.ql.lib.Nod import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; -import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; @@ -44,6 +43,7 @@ import org.apache.hadoop.hive.ql.plan.Sp import org.apache.hadoop.hive.ql.plan.SparkWork; import java.io.Serializable; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.LinkedList; @@ -103,8 +103,8 @@ public class GenSparkProcContext impleme // map that says which mapjoin belongs to which work item public final Map<MapJoinOperator, List<BaseWork>> mapJoinWorkMap; - // a map to keep track of which MapWork item holds which SMBMapJoinOp - public final Map<SMBMapJoinOperator, MapWork> smbJoinWorkMap; + // Map to keep track of which SMB Join operators and their information to annotate their MapWork with. + public final Map<SMBMapJoinOperator, SparkSMBMapJoinInfo> smbMapJoinCtxMap; // a map to keep track of which root generated which work public final Map<Operator<?>, BaseWork> rootToWorkMap; @@ -160,7 +160,7 @@ public class GenSparkProcContext impleme new LinkedHashMap<ReduceSinkOperator, ObjectPair<SparkEdgeProperty, ReduceWork>>(); this.linkOpWithWorkMap = new LinkedHashMap<Operator<?>, Map<BaseWork, SparkEdgeProperty>>(); this.linkWorkWithReduceSinkMap = new LinkedHashMap<BaseWork, List<ReduceSinkOperator>>(); - this.smbJoinWorkMap = new LinkedHashMap<SMBMapJoinOperator, MapWork>(); + this.smbMapJoinCtxMap = new HashMap<SMBMapJoinOperator, SparkSMBMapJoinInfo>(); this.mapJoinWorkMap = new LinkedHashMap<MapJoinOperator, List<BaseWork>>(); this.rootToWorkMap = new LinkedHashMap<Operator<?>, BaseWork>(); this.childToWorkMap = new LinkedHashMap<Operator<?>, List<BaseWork>>(); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java Wed Feb 4 21:32:34 2015 @@ -41,10 +41,12 @@ import org.apache.hadoop.hive.ql.exec.Ha import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkSortMergeJoinFactory; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -443,6 +445,25 @@ public class GenSparkUtils { return null; } + /** + * Fill MapWork with 'local' work and bucket information for SMB Join. + * @param context context, containing references to MapWorks and their SMB information. + * @throws SemanticException + */ + public void annotateMapWork(GenSparkProcContext context) throws SemanticException { + for (SMBMapJoinOperator smbMapJoinOp : context.smbMapJoinCtxMap.keySet()) { + //initialize mapwork with smbMapJoin information. + SparkSMBMapJoinInfo smbMapJoinInfo = context.smbMapJoinCtxMap.get(smbMapJoinOp); + MapWork work = smbMapJoinInfo.mapWork; + SparkSortMergeJoinFactory.annotateMapWork(context, work, smbMapJoinOp, + (TableScanOperator) smbMapJoinInfo.bigTableRootOp, false); + for (Operator<?> smallTableRootOp : smbMapJoinInfo.smallTableRootOps) { + SparkSortMergeJoinFactory.annotateMapWork(context, work, smbMapJoinOp, + (TableScanOperator) smallTableRootOp, true); + } + } + } + public synchronized int getNextSeqNumber() { return ++sequenceNumber; } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java Wed Feb 4 21:32:34 2015 @@ -34,10 +34,12 @@ import org.apache.hadoop.hive.ql.exec.Op import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkSortMergeJoinFactory; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; @@ -118,18 +120,12 @@ public class GenSparkWork implements Nod } else { // create a new vertex if (context.preceedingWork == null) { - if (smbOp != null) { - // This logic is for SortMergeBucket MapJoin case. - // This MapWork (of big-table, see above..) is later initialized by SparkMapJoinFactory - // processor, so don't initialize it here. Just keep track of it in the context, - // for later processing. - work = utils.createMapWork(context, root, sparkWork, null, true); - if (context.smbJoinWorkMap.get(smbOp) != null) { - throw new SemanticException("Each SMBMapJoin should be associated only with one Mapwork"); - } - context.smbJoinWorkMap.put(smbOp, (MapWork) work); - } else { + if (smbOp == null) { work = utils.createMapWork(context, root, sparkWork, null); + } else { + //save work to be initialized later with SMB information. + work = utils.createMapWork(context, root, sparkWork, null, true); + context.smbMapJoinCtxMap.get(smbOp).mapWork = (MapWork) work; } } else { work = utils.createReduceWork(context, root, sparkWork); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java Wed Feb 4 21:32:34 2015 @@ -186,17 +186,30 @@ public class SparkCompiler extends TaskC * * Some of the other processors are expecting only one traversal beyond SMBJoinOp. * We need to traverse from the big-table path only, and stop traversing on the small-table path once we reach SMBJoinOp. + * Also add some SMB join information to the context, so we can properly annotate the MapWork later on. */ opRules.put(new TypeRule(SMBMapJoinOperator.class), new NodeProcessor() { @Override public Object process(Node currNode, Stack<Node> stack, NodeProcessorCtx procCtx, Object... os) throws SemanticException { + GenSparkProcContext context = (GenSparkProcContext) procCtx; + SMBMapJoinOperator currSmbNode = (SMBMapJoinOperator) currNode; + SparkSMBMapJoinInfo smbMapJoinCtx = context.smbMapJoinCtxMap.get(currSmbNode); + if (smbMapJoinCtx == null) { + smbMapJoinCtx = new SparkSMBMapJoinInfo(); + context.smbMapJoinCtxMap.put(currSmbNode, smbMapJoinCtx); + } + for (Node stackNode : stack) { if (stackNode instanceof DummyStoreOperator) { + //If coming from small-table side, do some book-keeping, and skip traversal. + smbMapJoinCtx.smallTableRootOps.add(context.currentRootOperator); return true; } } + //If coming from big-table side, do some book-keeping, and continue traversal + smbMapJoinCtx.bigTableRootOp = context.currentRootOperator; return false; } } @@ -210,24 +223,14 @@ public class SparkCompiler extends TaskC GraphWalker ogw = new GenSparkWorkWalker(disp, procCtx); ogw.startWalking(topNodes, null); - - // ------------------- Second Pass ----------------------- - // SMB Join optimizations to add the "localWork" and bucketing data structures to MapWork. - opRules.clear(); - opRules.put(new TypeRule(SMBMapJoinOperator.class), - SparkSortMergeJoinFactory.getTableScanMapJoin()); - - disp = new DefaultRuleDispatcher(null, opRules, procCtx); - topNodes = new ArrayList<Node>(); - topNodes.addAll(pCtx.getTopOps().values()); - ogw = new GenSparkWorkWalker(disp, procCtx); - ogw.startWalking(topNodes, null); - // we need to clone some operator plans and remove union operators still for (BaseWork w: procCtx.workWithUnionOperators) { GenSparkUtils.getUtils().removeUnionOperators(conf, procCtx, w); } + // we need to fill MapWork with 'local' work and bucket information for SMB Join. + GenSparkUtils.getUtils().annotateMapWork(procCtx); + // finally make sure the file sink operators are set up right for (FileSinkOperator fileSink: procCtx.fileSinkSet) { GenSparkUtils.getUtils().processFileSink(procCtx, fileSink); Modified: hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java URL: http://svn.apache.org/viewvc/hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (original) +++ hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java Wed Feb 4 21:32:34 2015 @@ -232,6 +232,12 @@ public class Hadoop20SShims extends Hado throw new IOException("Cannot run tez on current hadoop, Version: " + VersionInfo.getVersion()); } + @Override + public MiniMrShim getMiniSparkCluster(Configuration conf, int numberOfTaskTrackers, + String nameNode, int numDir) throws IOException { + throw new IOException("Cannot run Spark on YARN on current Hadoop, Version: " + VersionInfo.getVersion()); + } + /** * Shim for MiniMrCluster */ Modified: hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java URL: http://svn.apache.org/viewvc/hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original) +++ hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Wed Feb 4 21:32:34 2015 @@ -415,6 +415,73 @@ public class Hadoop23Shims extends Hadoo } } + /** + * Returns a shim to wrap MiniSparkOnYARNCluster + */ + @Override + public MiniMrShim getMiniSparkCluster(Configuration conf, int numberOfTaskTrackers, + String nameNode, int numDir) throws IOException { + return new MiniSparkShim(conf, numberOfTaskTrackers, nameNode, numDir); + } + + /** + * Shim for MiniSparkOnYARNCluster + */ + public class MiniSparkShim extends Hadoop23Shims.MiniMrShim { + + private final MiniSparkOnYARNCluster mr; + private final Configuration conf; + + public MiniSparkShim(Configuration conf, int numberOfTaskTrackers, + String nameNode, int numDir) throws IOException { + + mr = new MiniSparkOnYARNCluster("sparkOnYarn"); + conf.set("fs.defaultFS", nameNode); + mr.init(conf); + mr.start(); + this.conf = mr.getConfig(); + } + + @Override + public int getJobTrackerPort() throws UnsupportedOperationException { + String address = conf.get("yarn.resourcemanager.address"); + address = StringUtils.substringAfterLast(address, ":"); + + if (StringUtils.isBlank(address)) { + throw new IllegalArgumentException("Invalid YARN resource manager port."); + } + + return Integer.parseInt(address); + } + + @Override + public void shutdown() throws IOException { + mr.stop(); + } + + @Override + public void setupConfiguration(Configuration conf) { + Configuration config = mr.getConfig(); + for (Map.Entry<String, String> pair : config) { + conf.set(pair.getKey(), pair.getValue()); + } + + Path jarPath = new Path("hdfs:///user/hive"); + Path hdfsPath = new Path("hdfs:///user/"); + try { + FileSystem fs = cluster.getFileSystem(); + jarPath = fs.makeQualified(jarPath); + conf.set("hive.jar.directory", jarPath.toString()); + fs.mkdirs(jarPath); + hdfsPath = fs.makeQualified(hdfsPath); + conf.set("hive.user.install.directory", hdfsPath.toString()); + fs.mkdirs(hdfsPath); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + // Don't move this code to the parent class. There's a binary // incompatibility between hadoop 1 and 2 wrt MiniDFSCluster and we // need to have two different shim classes even though they are Modified: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java (original) +++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java Wed Feb 4 21:32:34 2015 @@ -93,7 +93,10 @@ public interface HadoopShims { String nameNode, int numDir) throws IOException; public MiniMrShim getMiniTezCluster(Configuration conf, int numberOfTaskTrackers, - String nameNode, int numDir) throws IOException; + String nameNode, int numDir) throws IOException; + + public MiniMrShim getMiniSparkCluster(Configuration conf, int numberOfTaskTrackers, + String nameNode, int numDir) throws IOException; /** * Shim for MiniMrCluster Modified: hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java (original) +++ hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java Wed Feb 4 21:32:34 2015 @@ -53,4 +53,9 @@ public interface JobContext { */ Map<String, List<JavaFutureAction<?>>> getMonitoredJobs(); + /** + * Return all added jar path which added through AddJarJob. + */ + List<String> getAddedJars(); + } Modified: hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java (original) +++ hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java Wed Feb 4 21:32:34 2015 @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.hive.spark.counter.SparkCounters; @@ -32,11 +33,13 @@ class JobContextImpl implements JobConte private final JavaSparkContext sc; private final ThreadLocal<MonitorCallback> monitorCb; private final Map<String, List<JavaFutureAction<?>>> monitoredJobs; + private final List<String> addedJars; public JobContextImpl(JavaSparkContext sc) { this.sc = sc; this.monitorCb = new ThreadLocal<MonitorCallback>(); monitoredJobs = new ConcurrentHashMap<String, List<JavaFutureAction<?>>>(); + addedJars = new CopyOnWriteArrayList<String>(); } @@ -57,6 +60,11 @@ class JobContextImpl implements JobConte return monitoredJobs; } + @Override + public List<String> getAddedJars() { + return addedJars; + } + void setMonitorCb(MonitorCallback cb) { monitorCb.set(cb); } Modified: hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java (original) +++ hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java Wed Feb 4 21:32:34 2015 @@ -106,6 +106,8 @@ public class RemoteDriver { serverAddress = getArg(args, idx); } else if (key.equals("--remote-port")) { serverPort = Integer.parseInt(getArg(args, idx)); + } else if (key.equals("--client-id")) { + conf.set(SparkClientFactory.CONF_CLIENT_ID, getArg(args, idx)); } else if (key.equals("--secret")) { conf.set(SparkClientFactory.CONF_KEY_SECRET, getArg(args, idx)); } else if (key.equals("--conf")) { @@ -127,6 +129,8 @@ public class RemoteDriver { LOG.debug("Remote Driver configured with: " + e._1() + "=" + e._2()); } + String clientId = mapConf.get(SparkClientFactory.CONF_CLIENT_ID); + Preconditions.checkArgument(clientId != null, "No client ID provided."); String secret = mapConf.get(SparkClientFactory.CONF_KEY_SECRET); Preconditions.checkArgument(secret != null, "No secret provided."); @@ -140,8 +144,8 @@ public class RemoteDriver { this.protocol = new DriverProtocol(); // The RPC library takes care of timing out this. - this.clientRpc = Rpc.createClient(mapConf, egroup, serverAddress, serverPort, secret, protocol) - .get(); + this.clientRpc = Rpc.createClient(mapConf, egroup, serverAddress, serverPort, + clientId, secret, protocol).get(); this.running = true; this.clientRpc.addListener(new Rpc.Listener() { @@ -354,6 +358,11 @@ public class RemoteDriver { if (sparkCounters != null) { counters = sparkCounters.snapshot(); } + // make sure job has really succeeded + // at this point, future.get shall not block us + for (JavaFutureAction<?> future : jobs) { + future.get(); + } protocol.jobFinished(req.id, result, null, counters); } catch (Throwable t) { // Catch throwables in a best-effort to report job status back to the client. It's Modified: hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java (original) +++ hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java Wed Feb 4 21:32:34 2015 @@ -37,6 +37,9 @@ public final class SparkClientFactory { /** Used to run the driver in-process, mostly for testing. */ static final String CONF_KEY_IN_PROCESS = "spark.client.do_not_use.run_driver_in_process"; + /** Used by client and driver to share a client ID for establishing an RPC session. */ + static final String CONF_CLIENT_ID = "spark.client.authentication.client_id"; + /** Used by client and driver to share a secret for establishing an RPC session. */ static final String CONF_KEY_SECRET = "spark.client.authentication.secret"; Modified: hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java (original) +++ hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java Wed Feb 4 21:32:34 2015 @@ -64,9 +64,11 @@ class SparkClientImpl implements SparkCl private static final long DEFAULT_SHUTDOWN_TIMEOUT = 10000; // In milliseconds + private static final String OSX_TEST_OPTS = "SPARK_OSX_TEST_OPTS"; private static final String DRIVER_OPTS_KEY = "spark.driver.extraJavaOptions"; private static final String EXECUTOR_OPTS_KEY = "spark.executor.extraJavaOptions"; private static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath"; + private static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath"; private final Map<String, String> conf; private final HiveConf hiveConf; @@ -83,13 +85,14 @@ class SparkClientImpl implements SparkCl this.childIdGenerator = new AtomicInteger(); this.jobs = Maps.newConcurrentMap(); + String clientId = UUID.randomUUID().toString(); String secret = rpcServer.createSecret(); - this.driverThread = startDriver(rpcServer, secret); + this.driverThread = startDriver(rpcServer, clientId, secret); this.protocol = new ClientProtocol(); try { // The RPC server will take care of timeouts here. - this.driverRpc = rpcServer.registerClient(secret, protocol).get(); + this.driverRpc = rpcServer.registerClient(clientId, secret, protocol).get(); } catch (Exception e) { LOG.warn("Error while waiting for client to connect.", e); driverThread.interrupt(); @@ -173,7 +176,8 @@ class SparkClientImpl implements SparkCl protocol.cancel(jobId); } - private Thread startDriver(RpcServer rpcServer, final String secret) throws IOException { + private Thread startDriver(RpcServer rpcServer, final String clientId, final String secret) + throws IOException { Runnable runnable; final String serverAddress = rpcServer.getAddress(); final String serverPort = String.valueOf(rpcServer.getPort()); @@ -189,6 +193,8 @@ class SparkClientImpl implements SparkCl args.add(serverAddress); args.add("--remote-port"); args.add(serverPort); + args.add("--client-id"); + args.add(clientId); args.add("--secret"); args.add(secret); @@ -219,10 +225,16 @@ class SparkClientImpl implements SparkCl sparkLogDir = sparkHome + "/logs/"; } } + + String osxTestOpts = ""; + if (Strings.nullToEmpty(System.getProperty("os.name")).toLowerCase().contains("mac")) { + osxTestOpts = Strings.nullToEmpty(System.getenv(OSX_TEST_OPTS)); + } + String driverJavaOpts = Joiner.on(" ").skipNulls().join( - "-Dhive.spark.log.dir=" + sparkLogDir, conf.get(DRIVER_OPTS_KEY)); + "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(DRIVER_OPTS_KEY)); String executorJavaOpts = Joiner.on(" ").skipNulls().join( - "-Dhive.spark.log.dir=" + sparkLogDir, conf.get(EXECUTOR_OPTS_KEY)); + "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(EXECUTOR_OPTS_KEY)); // Create a file with all the job properties to be read by spark-submit. Change the // file's permissions so that only the owner can read it. This avoid having the @@ -236,18 +248,30 @@ class SparkClientImpl implements SparkCl for (Map.Entry<String, String> e : conf.entrySet()) { allProps.put(e.getKey(), conf.get(e.getKey())); } + allProps.put(SparkClientFactory.CONF_CLIENT_ID, clientId); allProps.put(SparkClientFactory.CONF_KEY_SECRET, secret); allProps.put(DRIVER_OPTS_KEY, driverJavaOpts); allProps.put(EXECUTOR_OPTS_KEY, executorJavaOpts); - String hiveHadoopTestClasspath = Strings.nullToEmpty(System.getenv("HIVE_HADOOP_TEST_CLASSPATH")); - if (!hiveHadoopTestClasspath.isEmpty()) { - String extraClasspath = Strings.nullToEmpty((String)allProps.get(DRIVER_EXTRA_CLASSPATH)); - if (extraClasspath.isEmpty()) { - allProps.put(DRIVER_EXTRA_CLASSPATH, hiveHadoopTestClasspath); - } else { - extraClasspath = extraClasspath.endsWith(File.pathSeparator) ? extraClasspath : extraClasspath + File.pathSeparator; - allProps.put(DRIVER_EXTRA_CLASSPATH, extraClasspath + hiveHadoopTestClasspath); + String isTesting = conf.get("spark.testing"); + if (isTesting != null && isTesting.equalsIgnoreCase("true")) { + String hiveHadoopTestClasspath = Strings.nullToEmpty(System.getenv("HIVE_HADOOP_TEST_CLASSPATH")); + if (!hiveHadoopTestClasspath.isEmpty()) { + String extraDriverClasspath = Strings.nullToEmpty((String)allProps.get(DRIVER_EXTRA_CLASSPATH)); + if (extraDriverClasspath.isEmpty()) { + allProps.put(DRIVER_EXTRA_CLASSPATH, hiveHadoopTestClasspath); + } else { + extraDriverClasspath = extraDriverClasspath.endsWith(File.pathSeparator) ? extraDriverClasspath : extraDriverClasspath + File.pathSeparator; + allProps.put(DRIVER_EXTRA_CLASSPATH, extraDriverClasspath + hiveHadoopTestClasspath); + } + + String extraExecutorClasspath = Strings.nullToEmpty((String)allProps.get(EXECUTOR_EXTRA_CLASSPATH)); + if (extraExecutorClasspath.isEmpty()) { + allProps.put(EXECUTOR_EXTRA_CLASSPATH, hiveHadoopTestClasspath); + } else { + extraExecutorClasspath = extraExecutorClasspath.endsWith(File.pathSeparator) ? extraExecutorClasspath : extraExecutorClasspath + File.pathSeparator; + allProps.put(EXECUTOR_EXTRA_CLASSPATH, extraExecutorClasspath + hiveHadoopTestClasspath); + } } } @@ -350,6 +374,9 @@ class SparkClientImpl implements SparkCl LOG.debug("Running client driver with argv: {}", Joiner.on(" ").join(argv)); ProcessBuilder pb = new ProcessBuilder(argv.toArray(new String[argv.size()])); + if (isTesting != null) { + pb.environment().put("SPARK_TESTING", isTesting); + } final Process child = pb.start(); int childId = childIdGenerator.incrementAndGet(); @@ -529,6 +556,9 @@ class SparkClientImpl implements SparkCl @Override public Serializable call(JobContext jc) throws Exception { jc.sc().addJar(path); + // Following remote job may refer to classes in this jar, and the remote job would be executed + // in a different thread, so we add this jar path to JobContext for further usage. + jc.getAddedJars().add(path); return null; } Modified: hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java (original) +++ hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java Wed Feb 4 21:32:34 2015 @@ -18,6 +18,7 @@ package org.apache.hive.spark.client.rpc; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; @@ -63,9 +64,12 @@ class KryoMessageCodec extends ByteToMes } }; + private volatile EncryptionHandler encryptionHandler; + public KryoMessageCodec(int maxMessageSize, Class<?>... messages) { this.maxMessageSize = maxMessageSize; this.messages = Arrays.asList(messages); + this.encryptionHandler = null; } @Override @@ -86,7 +90,7 @@ class KryoMessageCodec extends ByteToMes } try { - ByteBuffer nioBuffer = in.nioBuffer(in.readerIndex(), msgSize); + ByteBuffer nioBuffer = maybeDecrypt(in.nioBuffer(in.readerIndex(), msgSize)); Input kryoIn = new Input(new ByteBufferInputStream(nioBuffer)); Object msg = kryos.get().readClassAndObject(kryoIn); @@ -106,7 +110,7 @@ class KryoMessageCodec extends ByteToMes kryos.get().writeClassAndObject(kryoOut, msg); kryoOut.flush(); - byte[] msgData = bytes.toByteArray(); + byte[] msgData = maybeEncrypt(bytes.toByteArray()); LOG.debug("Encoded message of type {} ({} bytes)", msg.getClass().getName(), msgData.length); checkSize(msgData.length); @@ -115,10 +119,56 @@ class KryoMessageCodec extends ByteToMes buf.writeBytes(msgData); } + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + if (encryptionHandler != null) { + encryptionHandler.dispose(); + } + super.channelInactive(ctx); + } + private void checkSize(int msgSize) { Preconditions.checkArgument(msgSize > 0, "Message size (%s bytes) must be positive.", msgSize); Preconditions.checkArgument(maxMessageSize <= 0 || msgSize <= maxMessageSize, "Message (%s bytes) exceeds maximum allowed size (%s bytes).", msgSize, maxMessageSize); } + private byte[] maybeEncrypt(byte[] data) throws Exception { + return (encryptionHandler != null) ? encryptionHandler.wrap(data, 0, data.length) : data; + } + + private ByteBuffer maybeDecrypt(ByteBuffer data) throws Exception { + if (encryptionHandler != null) { + byte[] encrypted; + int len = data.limit() - data.position(); + int offset; + if (data.hasArray()) { + encrypted = data.array(); + offset = data.position() + data.arrayOffset(); + data.position(data.limit()); + } else { + encrypted = new byte[len]; + offset = 0; + data.get(encrypted); + } + return ByteBuffer.wrap(encryptionHandler.unwrap(encrypted, offset, len)); + } else { + return data; + } + } + + void setEncryptionHandler(EncryptionHandler handler) { + this.encryptionHandler = handler; + } + + interface EncryptionHandler { + + byte[] wrap(byte[] data, int offset, int len) throws IOException; + + byte[] unwrap(byte[] data, int offset, int len) throws IOException; + + void dispose() throws IOException; + + } + } Modified: hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/README.md URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/README.md?rev=1657405&r1=1657404&r2=1657405&view=diff ============================================================================== --- hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/README.md (original) +++ hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/README.md Wed Feb 4 21:32:34 2015 @@ -6,7 +6,7 @@ Basic flow of events: - Client side creates an RPC server - Client side spawns RemoteDriver, which manages the SparkContext, and provides a secret - Client side sets up a timer to wait for RemoteDriver to connect back -- RemoteDriver connects back to client, sends Hello message with secret +- RemoteDriver connects back to client, SASL handshake ensues - Connection is established and now there's a session between the client and the driver. Features of the RPC layer: