Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13065#discussion_r88755507
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala ---
    @@ -103,5 +109,192 @@ case class GenerateExec(
           }
         }
       }
    -}
     
    +  override def inputRDDs(): Seq[RDD[InternalRow]] = {
    +    child.asInstanceOf[CodegenSupport].inputRDDs()
    +  }
    +
    +  protected override def doProduce(ctx: CodegenContext): String = {
    +    child.asInstanceOf[CodegenSupport].produce(ctx, this)
    +  }
    +
    +  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: 
ExprCode): String = {
    +    ctx.currentVars = input
    +    ctx.copyResult = true
    +
    +    // Add input rows to the values when we are joining
    +    val values = if (join) {
    +      input
    +    } else {
    +      Seq.empty
    +    }
    +
    +    boundGenerator match {
    +      case e: CollectionGenerator => codeGenCollection(ctx, e, values, row)
    +      case g => codeGenTraversableOnce(ctx, g, values, row)
    +    }
    +  }
    +
    +  /**
    +   * Generate code for [[CollectionGenerator]] expressions.
    +   */
    +  private def codeGenCollection(
    +      ctx: CodegenContext,
    +      e: CollectionGenerator,
    +      input: Seq[ExprCode],
    +      row: ExprCode): String = {
    +
    +    // Generate code for the generator.
    +    val data = e.genCode(ctx)
    +
    +    // Generate looping variables.
    +    val index = ctx.freshName("index")
    +
    +    // Add a check if the generate outer flag is true.
    +    val checks = optionalCode(outer, data.isNull)
    +
    +    // Add position
    +    val position = if (e.position) {
    +      Seq(ExprCode("", "false", index))
    +    } else {
    +      Seq.empty
    +    }
    +
    +    // Generate code for either ArrayData or MapData
    +    val (initMapData, updateRowData, values) = e.collectionType match {
    +      case ArrayType(st: StructType, nullable) if e.inline =>
    +        val row = codeGenAccessor(ctx, data.value, "col", index, st, 
nullable, checks)
    +        val fieldChecks = checks ++ optionalCode(nullable, row.isNull)
    +        val columns = st.fields.toSeq.zipWithIndex.map { case (f, i) =>
    +          codeGenAccessor(ctx, row.value, f.name, i.toString, f.dataType, 
f.nullable, fieldChecks)
    +        }
    +        ("", row.code, columns)
    +
    +      case ArrayType(dataType, nullable) =>
    +        ("", "", Seq(codeGenAccessor(ctx, data.value, "col", index, 
dataType, nullable, checks)))
    +
    +      case MapType(keyType, valueType, valueContainsNull) =>
    +        // Materialize the key and the value arrays before we enter the 
loop.
    +        val keyArray = ctx.freshName("keyArray")
    +        val valueArray = ctx.freshName("valueArray")
    +        val initArrayData =
    +          s"""
    +             |ArrayData $keyArray = ${data.isNull} ? null : 
${data.value}.keyArray();
    +             |ArrayData $valueArray = ${data.isNull} ? null : 
${data.value}.valueArray();
    +           """.stripMargin
    +        val values = Seq(
    +          codeGenAccessor(ctx, keyArray, "key", index, keyType, nullable = 
false, checks),
    +          codeGenAccessor(ctx, valueArray, "value", index, valueType, 
valueContainsNull, checks))
    +        (initArrayData, "", values)
    +    }
    +
    +    // In case of outer=true we need to make sure the loop is executed 
at-least once when the
    +    // array/map contains no input. We do this by setting the looping 
index to -1 if there is no
    +    // input, evaluation of the array is prevented by a check in the 
accessor code.
    +    val numElements = ctx.freshName("numElements")
    +    val init = if (outer) {
    +      s"$numElements == 0 ? -1 : 0"
    +    } else {
    +      "0"
    +    }
    +    val numOutput = metricTerm(ctx, "numOutputRows")
    +    s"""
    +       |${data.code}
    +       |$initMapData
    +       |int $numElements = ${data.isNull} ? 0 : 
${data.value}.numElements();
    +       |for (int $index = $init; $index < $numElements; $index++) {
    +       |  $numOutput.add(1);
    +       |  $updateRowData
    +       |  ${consume(ctx, input ++ position ++ values)}
    +       |}
    +     """.stripMargin
    +  }
    +
    +  /**
    +   * Generate code for a regular [[TraversableOnce]] returning 
[[Generator]].
    +   */
    +  private def codeGenTraversableOnce(
    +      ctx: CodegenContext,
    +      e: Expression,
    +      input: Seq[ExprCode],
    +      row: ExprCode): String = {
    +
    +    // Generate the code for the generator
    +    val data = e.genCode(ctx)
    +
    +    // Generate looping variables.
    +    val iterator = ctx.freshName("iterator")
    +    val hasNext = ctx.freshName("hasNext")
    +    val current = ctx.freshName("row")
    +
    +    // Add a check if the generate outer flag is true.
    +    val checks = optionalCode(outer, s"!$hasNext")
    +    val values = e.dataType match {
    +      case ArrayType(st: StructType, nullable) =>
    +        st.fields.toSeq.zipWithIndex.map { case (f, i) =>
    +          codeGenAccessor(ctx, current, f.name, s"$i", f.dataType, 
f.nullable, checks)
    +        }
    +    }
    +
    +    // In case of outer=true we need to make sure the loop is executed 
at-least-once when the
    +    // iterator contains no input. We do this by adding an 'outer' 
variable which guarantees
    +    // execution of the first iteration even if there is no input. 
Evaluation of the iterator is
    +    // prevented by checks in the next() and accessor code.
    +    val hasNextCode = s"$hasNext = $iterator.hasNext()"
    +    val outerVal = ctx.freshName("outer")
    +    val (init, check, update, next) = if (outer) {
    +      (s"boolean $hasNextCode, $outerVal = true",
    +       s"$hasNext || $outerVal",
    +       s"$hasNextCode, $outerVal = false",
    +       s"$hasNext ? $iterator.next() : null")
    +    } else {
    +      (s"boolean $hasNextCode",
    +       s"$hasNext",
    +       s"$hasNextCode",
    +       s"$iterator.next()")
    +    }
    +    val numOutput = metricTerm(ctx, "numOutputRows")
    +    s"""
    +       |${data.code}
    +       |scala.collection.Iterator<InternalRow> $iterator = 
${data.value}.toIterator();
    +       |for ($init; $check; $update) {
    +       |  $numOutput.add(1);
    +       |  InternalRow $current = (InternalRow)($next);
    +       |  ${consume(ctx, input ++ values)}
    +       |}
    +     """.stripMargin
    --- End diff --
    
    minor: I'm thinking of 
    ```
    if (outer) {
        s"""
            |${data.code}
            |scala.collection.Iterator<InternalRow> $iterator = 
${data.value}.toIterator();
            |boolean $outerVal = true;
            |while ($iterator.hasNext() || $outerVal) {
            |  $numOutput.add(1);
            |  InternalRow $current = (InternalRow)($iterator.hasNext()? 
$iterator.next() : null);
            |  $outerVal = false;
            |  ${consume(ctx, input ++ values)}
            |}
          """.stripMargin
    } else {
        s"""
            |${data.code}
            |scala.collection.Iterator<InternalRow> $iterator = 
${data.value}.toIterator();
            |while ($iterator.hasNext()) {
            |  $numOutput.add(1);
            |  InternalRow $current = (InternalRow)($iterator.next());
            |  ${consume(ctx, input ++ values)}
            |}
          """.stripMargin
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to