[GitHub] [spark] maropu commented on a change in pull request #28676: [SPARK-31869][SQL] BroadcastHashJoinExec can utilize the build side for its output partitioning

2020-07-07 Thread GitBox


maropu commented on a change in pull request #28676:
URL: https://github.com/apache/spark/pull/28676#discussion_r451202429



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
##
@@ -415,6 +417,192 @@ abstract class BroadcastJoinSuiteBase extends QueryTest 
with SQLTestUtils
   assert(e.getMessage.contains(s"Could not execute broadcast in $timeout 
secs."))
 }
   }
+
+  test("broadcast join where streamed side's output partitioning is 
HashPartitioning") {
+withTable("t1", "t3") {
+  withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "500") {
+val df1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1")
+val df2 = (0 until 20).map(i => (i % 7, i % 11)).toDF("i2", "j2")
+val df3 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i3", "j3")
+df1.write.format("parquet").bucketBy(8, "i1", "j1").saveAsTable("t1")
+df3.write.format("parquet").bucketBy(8, "i3", "j3").saveAsTable("t3")
+val t1 = spark.table("t1")
+val t3 = spark.table("t3")
+
+// join1 is a broadcast join where df2 is broadcasted. Note that 
output partitioning on the
+// streamed side (t1) is HashPartitioning (bucketed files).
+val join1 = t1.join(df2, t1("i1") === df2("i2") && t1("j1") === 
df2("j2"))
+val plan1 = join1.queryExecution.executedPlan
+assert(collect(plan1) { case e: ShuffleExchangeExec => e }.isEmpty)
+val broadcastJoins = collect(plan1) { case b: BroadcastHashJoinExec => 
b }
+assert(broadcastJoins.size == 1)
+broadcastJoins(0).outputPartitioning match {
+  case p: PartitioningCollection =>
+assert(p.partitionings.size == 4)
+// Verify all the combinations of output partitioning.
+Seq(Seq(t1("i1"), t1("j1")),
+  Seq(t1("i1"), df2("j2")),
+  Seq(df2("i2"), t1("j1")),
+  Seq(df2("i2"), df2("j2"))).foreach { expected =>
+  val expectedExpressions = expected.map(_.expr)
+  assert(p.partitionings.exists {
+case h: HashPartitioning => expressionsEqual(h.expressions, 
expectedExpressions)
+  })
+}
+  case _ => fail()
+}
+
+// Join on the column from the broadcasted side (i2, j2) and make sure 
output partitioning
+// is maintained by checking no shuffle exchange is introduced.
+val join2 = join1.join(t3, join1("i2") === t3("i3") && join1("j2") === 
t3("j3"))
+val plan2 = join2.queryExecution.executedPlan
+assert(collect(plan2) { case s: SortMergeJoinExec => s }.size == 1)
+assert(collect(plan2) { case b: BroadcastHashJoinExec => b }.size == 1)
+assert(collect(plan2) { case e: ShuffleExchangeExec => e }.isEmpty)
+
+// Validate the data with broadcast join off.
+withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+  val df = join1.join(t3, join1("i2") === t3("i3") && join1("j2") === 
t3("j3"))
+  QueryTest.sameRows(join2.collect().toSeq, df.collect().toSeq)
+}
+  }
+}
+  }
+
+  test("broadcast join where streamed side's output partitioning is 
PartitioningCollection") {
+withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "500") {
+  val t1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1")
+  val t2 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i2", "j2")
+  val t3 = (0 until 20).map(i => (i % 7, i % 11)).toDF("i3", "j3")
+  val t4 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i4", "j4")
+
+  // join1 is a sort merge join (shuffle on the both sides).
+  val join1 = t1.join(t2, t1("i1") === t2("i2"))
+  val plan1 = join1.queryExecution.executedPlan
+  assert(collect(plan1) { case s: SortMergeJoinExec => s }.size == 1)
+  assert(collect(plan1) { case e: ShuffleExchangeExec => e }.size == 2)
+
+  // join2 is a broadcast join where t3 is broadcasted. Note that output 
partitioning on the
+  // streamed side (join1) is PartitioningCollection (sort merge join)
+  val join2 = join1.join(t3, join1("i1") === t3("i3"))
+  val plan2 = join2.queryExecution.executedPlan
+  assert(collect(plan2) { case s: SortMergeJoinExec => s }.size == 1)
+  assert(collect(plan2) { case e: ShuffleExchangeExec => e }.size == 2)
+  val broadcastJoins = collect(plan2) { case b: BroadcastHashJoinExec => b 
}
+  assert(broadcastJoins.size == 1)
+  broadcastJoins(0).outputPartitioning match {
+case p: PartitioningCollection =>
+  assert(p.partitionings.size == 3)
+  // Verify all the combinations of output partitioning.
+  Seq(Seq(t1("i1")), Seq(t2("i2")), Seq(t3("i3"))).foreach { expected 
=>
+val expectedExpressions = expected.map(_.expr)
+assert(p.partitionings.exists {
+  case h: HashPartitioning => expressionsEqual(h.expressions, 

[GitHub] [spark] maropu commented on a change in pull request #28676: [SPARK-31869][SQL] BroadcastHashJoinExec can utilize the build side for its output partitioning

2020-07-07 Thread GitBox


maropu commented on a change in pull request #28676:
URL: https://github.com/apache/spark/pull/28676#discussion_r451200682



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##
@@ -60,6 +62,66 @@ case class BroadcastHashJoinExec(
 }
   }
 
+  override def outputPartitioning: Partitioning = {
+joinType match {
+  case _: InnerLike =>
+streamedPlan.outputPartitioning match {
+  case h: HashPartitioning => 
PartitioningCollection(expandOutputPartitioning(h))
+  case c: PartitioningCollection =>
+def expand(partitioning: PartitioningCollection): Partitioning = {
+  PartitioningCollection(partitioning.partitionings.flatMap {
+case h: HashPartitioning => expandOutputPartitioning(h)
+case c: PartitioningCollection => Seq(expand(c))
+case other => Seq(other)
+  })
+}
+expand(c)
+  case other => other
+}
+  case _ => streamedPlan.outputPartitioning
+}
+  }
+
+  // An one-to-many mapping from a streamed key to build keys.
+  private lazy val streamedKeyToBuildKeyMapping = {
+val mapping = mutable.Map.empty[Expression, Seq[Expression]]
+streamedKeys.zip(buildKeys).foreach {
+  case (streamedKey, buildKey) =>
+val key = streamedKey.canonicalized
+mapping.get(key) match {
+  case Some(v) => mapping.put(key, v :+ buildKey)
+  case None => mapping.put(key, Seq(buildKey))
+}
+}
+mapping.toMap
+  }
+
+  // Expands the given partitioning by substituting streamed keys with build 
keys.
+  // For example, if the expressions for the given partitioning are Seq("a", 
"b", "c")
+  // where the streamed keys are Seq("b", "c") and the build keys are Seq("x", 
"y"),
+  // the expanded partitioning will have the following expressions:
+  // Seq("a", "b", "c"), Seq("a", "b", "y"), Seq("a", "x", "c"), Seq("a", "x", 
"y").
+  private def expandOutputPartitioning(partitioning: HashPartitioning): 
Seq[HashPartitioning] = {
+def generateExprCombinations(
+current: Seq[Expression],
+accumulated: Seq[Expression],
+all: mutable.ListBuffer[Seq[Expression]]): Unit = {

Review comment:
   Could you avoid using mutable in method arguments, and use a return 
value instead? e.g., 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala#L124-L133





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28676: [SPARK-31869][SQL] BroadcastHashJoinExec can utilize the build side for its output partitioning

2020-07-07 Thread GitBox


maropu commented on a change in pull request #28676:
URL: https://github.com/apache/spark/pull/28676#discussion_r451198766



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##
@@ -60,6 +62,66 @@ case class BroadcastHashJoinExec(
 }
   }
 
+  override def outputPartitioning: Partitioning = {
+joinType match {
+  case _: InnerLike =>
+streamedPlan.outputPartitioning match {
+  case h: HashPartitioning => 
PartitioningCollection(expandOutputPartitioning(h))
+  case c: PartitioningCollection =>
+def expand(partitioning: PartitioningCollection): Partitioning = {

Review comment:
   Could you pull out this inner metdhod and define it outside as private? 
Also, we need to assign a resonable method name.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28676: [SPARK-31869][SQL] BroadcastHashJoinExec can utilize the build side for its output partitioning

2020-07-07 Thread GitBox


maropu commented on a change in pull request #28676:
URL: https://github.com/apache/spark/pull/28676#discussion_r451198827



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##
@@ -60,6 +62,66 @@ case class BroadcastHashJoinExec(
 }
   }
 
+  override def outputPartitioning: Partitioning = {

Review comment:
   val or lazy val?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28676: [SPARK-31869][SQL] BroadcastHashJoinExec can utilize the build side for its output partitioning

2020-06-30 Thread GitBox


maropu commented on a change in pull request #28676:
URL: https://github.com/apache/spark/pull/28676#discussion_r448120574



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##
@@ -60,6 +60,26 @@ case class BroadcastHashJoinExec(
 }
   }
 
+  override def outputPartitioning: Partitioning = {
+def buildKeys: Seq[Expression] = buildSide match {
+  case BuildLeft => leftKeys
+  case BuildRight => rightKeys
+}
+
+joinType match {
+  case _: InnerLike =>

Review comment:
   NVM, on second thought, its difficult to hanlde this issue in that side.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org