[GitHub] [spark] cloud-fan commented on a change in pull request #29277: [SPARK-32421][SQL] Add code-gen for shuffled hash join

2020-07-30 Thread GitBox


cloud-fan commented on a change in pull request #29277:
URL: https://github.com/apache/spark/pull/29277#discussion_r463417316



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
##
@@ -903,6 +904,10 @@ case class CollapseCodegenStages(
 // The children of SortMergeJoin should do codegen separately.
 j.withNewChildren(j.children.map(
   child => InputAdapter(insertWholeStageCodegen(child
+  case j: ShuffledHashJoinExec =>
+// The children of ShuffledHashJoin should do codegen separately.
+j.withNewChildren(j.children.map(

Review comment:
   And there are more problems if we have many shuffle hash join stay 
together. We need to accumulate the `CodegenSupport.inputRDDs`, but 
`WholeStageCodegenExec` only supports up to 2 input RDDs for now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #29277: [SPARK-32421][SQL] Add code-gen for shuffled hash join

2020-07-30 Thread GitBox


cloud-fan commented on a change in pull request #29277:
URL: https://github.com/apache/spark/pull/29277#discussion_r463415144



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
##
@@ -903,6 +904,10 @@ case class CollapseCodegenStages(
 // The children of SortMergeJoin should do codegen separately.
 j.withNewChildren(j.children.map(
   child => InputAdapter(insertWholeStageCodegen(child
+  case j: ShuffledHashJoinExec =>
+// The children of ShuffledHashJoin should do codegen separately.
+j.withNewChildren(j.children.map(

Review comment:
   Makes sense to me. It's possible to make the input index dynamic, so one 
join node can use `input[0]` and the other can use `input[1]`. But I don't have 
a good idea about how to do it now. Let's leave it for future work.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #29277: [SPARK-32421][SQL] Add code-gen for shuffled hash join

2020-07-30 Thread GitBox


cloud-fan commented on a change in pull request #29277:
URL: https://github.com/apache/spark/pull/29277#discussion_r463382990



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
##
@@ -903,6 +904,10 @@ case class CollapseCodegenStages(
 // The children of SortMergeJoin should do codegen separately.
 j.withNewChildren(j.children.map(
   child => InputAdapter(insertWholeStageCodegen(child
+  case j: ShuffledHashJoinExec =>
+// The children of ShuffledHashJoin should do codegen separately.
+j.withNewChildren(j.children.map(

Review comment:
   Seems we only need to do it for the build side?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #29277: [SPARK-32421][SQL] Add code-gen for shuffled hash join

2020-07-30 Thread GitBox


cloud-fan commented on a change in pull request #29277:
URL: https://github.com/apache/spark/pull/29277#discussion_r462759609



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##
@@ -70,4 +74,54 @@ case class ShuffledHashJoinExec(
   join(streamIter, hashed, numOutputRows)
 }
   }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+streamedPlan.execute() :: buildPlan.execute() :: Nil
+  }
+
+  override def needCopyResult: Boolean = true
+
+  override protected def doProduce(ctx: CodegenContext): String = {
+// inline mutable state since not many join operations in a task
+val streamedInput = ctx.addMutableState(
+  "scala.collection.Iterator", "streamedInput", v => s"$v = inputs[0];", 
forceInline = true)
+val streamedRow = ctx.addMutableState(
+  "InternalRow", "streamedRow", forceInline = true)
+val (streamInputVar, streamInputVarDecl) = createVars(ctx, streamedRow, 
streamedPlan.output)
+
+val join = joinType match {
+  case _: InnerLike => codegenInner(ctx, streamInputVar)
+  case LeftOuter | RightOuter => codegenOuter(ctx, streamInputVar)
+  case LeftSemi => codegenSemi(ctx, streamInputVar)
+  case LeftAnti => codegenAnti(ctx, streamInputVar)
+  case _: ExistenceJoin => codegenExistence(ctx, streamInputVar)
+  case x =>
+throw new IllegalArgumentException(
+  s"ShuffledHashJoin should not take $x as the JoinType")
+}
+
+s"""
+   |while ($streamedInput.hasNext()) {

Review comment:
   This is exactly what will be generated by 
`streamedPlan.asInstanceOf[CodegenSupport].produce(ctx, this)`. See 
`InputAdapter.doProduce`. I don't understand why we can't put the join part in 
`doConsume`.

##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##
@@ -70,4 +74,54 @@ case class ShuffledHashJoinExec(
   join(streamIter, hashed, numOutputRows)
 }
   }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+streamedPlan.execute() :: buildPlan.execute() :: Nil
+  }
+
+  override def needCopyResult: Boolean = true
+
+  override protected def doProduce(ctx: CodegenContext): String = {
+// inline mutable state since not many join operations in a task
+val streamedInput = ctx.addMutableState(
+  "scala.collection.Iterator", "streamedInput", v => s"$v = inputs[0];", 
forceInline = true)
+val streamedRow = ctx.addMutableState(
+  "InternalRow", "streamedRow", forceInline = true)
+val (streamInputVar, streamInputVarDecl) = createVars(ctx, streamedRow, 
streamedPlan.output)
+
+val join = joinType match {
+  case _: InnerLike => codegenInner(ctx, streamInputVar)
+  case LeftOuter | RightOuter => codegenOuter(ctx, streamInputVar)
+  case LeftSemi => codegenSemi(ctx, streamInputVar)
+  case LeftAnti => codegenAnti(ctx, streamInputVar)
+  case _: ExistenceJoin => codegenExistence(ctx, streamInputVar)
+  case x =>
+throw new IllegalArgumentException(
+  s"ShuffledHashJoin should not take $x as the JoinType")
+}
+
+s"""
+   |while ($streamedInput.hasNext()) {

Review comment:
   Note: 
https://github.com/apache/spark/pull/29277/files#diff-db4ffe4f0196a9d7cf1f04c350ee3381R124
   
   We actually build the relation in the class constructor, the codegen flow 
should be the same with broadcast hash join.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #29277: [SPARK-32421][SQL] Add code-gen for shuffled hash join

2020-07-29 Thread GitBox


cloud-fan commented on a change in pull request #29277:
URL: https://github.com/apache/spark/pull/29277#discussion_r462369012



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
##
@@ -316,6 +318,387 @@ trait HashJoin extends BaseJoinExec {
   resultProj(r)
 }
   }
+
+  /**
+   * Returns the code for generating join key for stream side, and expression 
of whether the key
+   * has any null in it or not.
+   */
+  protected def genStreamSideJoinKey(
+  ctx: CodegenContext,
+  input: Seq[ExprCode]): (ExprCode, String) = {
+ctx.currentVars = input
+if (streamedBoundKeys.length == 1 && streamedBoundKeys.head.dataType == 
LongType) {
+  // generate the join key as Long
+  val ev = streamedBoundKeys.head.genCode(ctx)
+  (ev, ev.isNull)
+} else {
+  // generate the join key as UnsafeRow
+  val ev = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+  (ev, s"${ev.value}.anyNull()")
+}
+  }
+
+  /**
+   * Generates the code for variable of build side.
+   */
+  private def genBuildSideVars(ctx: CodegenContext, matched: String): 
Seq[ExprCode] = {
+ctx.currentVars = null
+ctx.INPUT_ROW = matched
+buildPlan.output.zipWithIndex.map { case (a, i) =>
+  val ev = BoundReference(i, a.dataType, a.nullable).genCode(ctx)
+  if (joinType.isInstanceOf[InnerLike]) {
+ev
+  } else {
+// the variables are needed even there is no matched rows
+val isNull = ctx.freshName("isNull")
+val value = ctx.freshName("value")
+val javaType = CodeGenerator.javaType(a.dataType)
+val code = code"""
+  |boolean $isNull = true;
+  |$javaType $value = ${CodeGenerator.defaultValue(a.dataType)};
+  |if ($matched != null) {
+  |  ${ev.code}
+  |  $isNull = ${ev.isNull};
+  |  $value = ${ev.value};
+  |}
+ """.stripMargin
+ExprCode(code, JavaCode.isNullVariable(isNull), 
JavaCode.variable(value, a.dataType))
+  }
+}
+  }
+
+  /**
+   * Generate the (non-equi) condition used to filter joined rows. This is 
used in Inner, Left Semi
+   * and Left Anti joins.
+   */
+  protected def getJoinCondition(
+  ctx: CodegenContext,
+  input: Seq[ExprCode]): (String, String, Seq[ExprCode]) = {
+val matched = ctx.freshName("matched")
+val buildVars = genBuildSideVars(ctx, matched)
+val checkCondition = if (condition.isDefined) {
+  val expr = condition.get
+  // evaluate the variables from build side that used by condition
+  val eval = evaluateRequiredVariables(buildPlan.output, buildVars, 
expr.references)
+  // filter the output via condition
+  ctx.currentVars = input ++ buildVars
+  val ev =
+BindReferences.bindReference(expr, streamedPlan.output ++ 
buildPlan.output).genCode(ctx)
+  val skipRow = s"${ev.isNull} || !${ev.value}"
+  s"""
+ |$eval
+ |${ev.code}
+ |if (!($skipRow))
+   """.stripMargin
+} else {
+  ""
+}
+(matched, checkCondition, buildVars)
+  }
+
+  /**
+   * Generates the code for Inner join.
+   */
+  protected def codegenInner(ctx: CodegenContext, input: Seq[ExprCode]): 
String = {
+val (relationTerm, keyIsKnownUnique) = prepareRelation(ctx)
+val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
+val (matched, checkCondition, buildVars) = getJoinCondition(ctx, input)
+val numOutput = metricTerm(ctx, "numOutputRows")
+
+val resultVars = buildSide match {
+  case BuildLeft => buildVars ++ input
+  case BuildRight => input ++ buildVars
+}
+
+if (keyIsKnownUnique) {

Review comment:
   what do you mean? We already have it before, right? 
https://github.com/apache/spark/pull/29277/files#diff-4455c05ddcdb096c36d9e0bd326dfe12L325

##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
##
@@ -316,6 +318,387 @@ trait HashJoin extends BaseJoinExec {
   resultProj(r)
 }
   }
+
+  /**
+   * Returns the code for generating join key for stream side, and expression 
of whether the key
+   * has any null in it or not.
+   */
+  protected def genStreamSideJoinKey(
+  ctx: CodegenContext,
+  input: Seq[ExprCode]): (ExprCode, String) = {
+ctx.currentVars = input
+if (streamedBoundKeys.length == 1 && streamedBoundKeys.head.dataType == 
LongType) {
+  // generate the join key as Long
+  val ev = streamedBoundKeys.head.genCode(ctx)
+  (ev, ev.isNull)
+} else {
+  // generate the join key as UnsafeRow
+  val ev = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+  (ev, s"${ev.value}.anyNull()")
+}
+  }
+
+  /**
+   * Generates the code for variable of build side.
+   */
+  private def genBuildSideVars(ctx: CodegenContext, matched: String): 
Seq[ExprCode] = {
+ctx.currentVars = null
+ctx.INPUT_ROW = matched
+

[GitHub] [spark] cloud-fan commented on a change in pull request #29277: [SPARK-32421][SQL] Add code-gen for shuffled hash join

2020-07-29 Thread GitBox


cloud-fan commented on a change in pull request #29277:
URL: https://github.com/apache/spark/pull/29277#discussion_r462035327



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
##
@@ -316,6 +318,387 @@ trait HashJoin extends BaseJoinExec {
   resultProj(r)
 }
   }
+

Review comment:
   can you use PR comments to highlight the real changes? Seems most of the 
diff is just moving code around.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org