Github user rxin commented on a diff in the pull request:
https://github.com/apache/spark/pull/734#discussion_r12927830
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
@@ -142,6 +136,68 @@ case class HashJoin(
/**
* :: DeveloperApi ::
+ * Performs and inner hash join of two child relations by first shuffling
the data using the join
+ * keys.
+ */
+@DeveloperApi
+case class ShuffledHashJoin(
+ leftKeys: Seq[Expression],
+ rightKeys: Seq[Expression],
+ buildSide: BuildSide,
+ left: SparkPlan,
+ right: SparkPlan) extends BinaryNode with HashJoin {
+
+ override def outputPartitioning: Partitioning = left.outputPartitioning
+
+ override def requiredChildDistribution =
+ ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) ::
Nil
+
+
+ def execute() = {
+ buildPlan.execute().zipPartitions(streamedPlan.execute()) {
+ (buildIter, streamIter) => joinIterators(buildIter, streamIter)
+ }
+ }
+}
+
+
+/**
+ * :: DeveloperApi ::
+ * Performs an inner hash join of two child relations. When the operator
is constructed, a Spark
+ * job is asynchronously started to calculate the values for the
broadcasted relation. This data
+ * is then placed in a Spark broadcast variable. The streamed relation is
not shuffled.
+ */
+@DeveloperApi
+case class BroadcastHashJoin(
+ leftKeys: Seq[Expression],
+ rightKeys: Seq[Expression],
+ buildSide: BuildSide,
+ left: SparkPlan,
+ right: SparkPlan)(@transient sc: SparkContext) extends BinaryNode
with HashJoin {
+
+ override def otherCopyArgs = sc :: Nil
+
+ override def outputPartitioning: Partitioning = left.outputPartitioning
+
+ override def requiredChildDistribution =
+ UnspecifiedDistribution :: UnspecifiedDistribution :: Nil
+
+ @transient
+ lazy val broadcastFuture = future {
+ sc.broadcast(buildPlan.executeCollect())
--- End diff --
In Spark 1.0, with the newly added garbage collection mechanism, when the
query plan itself goes out of scope, the broadcast variable should also be
cleaned automatically.
Another way we can do this is to have some query context object we pass
around the entire physical query plan which tracks the stuff we need to clean
up.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---