Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8747#discussion_r40962839
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
 ---
    @@ -393,10 +393,278 @@ object GenerateUnsafeProjection extends 
CodeGenerator[Seq[Expression], UnsafePro
         case _ => input
       }
     
    +  val rowWriterClass = classOf[UnsafeRowWriter].getName
    +  val arrayWriterClass = classOf[UnsafeArrayWriter].getName
    +
    +  // todo: if the nullability of field is correct, we can use it to save 
null check.
    +  private def writeStructToBuffer(
    +      ctx: CodeGenContext,
    +      input: String,
    +      fieldTypes: Seq[DataType],
    +      bufferHolder: String): String = {
    +    val fieldEvals = fieldTypes.zipWithIndex.map { case (dt, i) =>
    +      val fieldName = ctx.freshName("fieldName")
    +      val code = s"final ${ctx.javaType(dt)} $fieldName = 
${ctx.getValue(input, dt, i.toString)};"
    +      val isNull = s"$input.isNullAt($i)"
    +      GeneratedExpressionCode(code, isNull, fieldName)
    +    }
    +
    +    s"""
    +      if ($input instanceof UnsafeRow) {
    +        $rowWriterClass.directWrite($bufferHolder, (UnsafeRow) $input);
    +      } else {
    +        ${writeExpressionsToBuffer(ctx, input, fieldEvals, fieldTypes, 
bufferHolder)}
    +      }
    +    """
    +  }
    +
    +  private def writeExpressionsToBuffer(
    +      ctx: CodeGenContext,
    +      row: String,
    +      inputs: Seq[GeneratedExpressionCode],
    +      inputTypes: Seq[DataType],
    +      bufferHolder: String): String = {
    +    val rowWriter = ctx.freshName("rowWriter")
    +    ctx.addMutableState(rowWriterClass, rowWriter, s"this.$rowWriter = new 
$rowWriterClass();")
    +
    +    val writeFields = inputs.zip(inputTypes).zipWithIndex.map {
    +      case ((input, dt), index) =>
    +        val tmpCursor = ctx.freshName("tmpCursor")
    +
    +        val setNull = dt match {
    +          case t: DecimalType if t.precision > Decimal.MAX_LONG_DIGITS =>
    +            // Can't call setNullAt() for DecimalType with precision 
larger than 18.
    +            s"$rowWriter.write($index, null, 0, 0);"
    +          case _ => s"$rowWriter.setNullAt($index);"
    +        }
    +
    +        val writeField = dt match {
    +          case t: StructType =>
    +            s"""
    +              // Remember the current cursor so that we can calculate how 
many bytes are
    +              // written later.
    +              final int $tmpCursor = $bufferHolder.cursor;
    +              ${writeStructToBuffer(ctx, input.primitive, 
t.map(_.dataType), bufferHolder)}
    +              $rowWriter.setOffsetAndSize($index, $tmpCursor, 
$bufferHolder.cursor - $tmpCursor);
    +            """
    +
    +          case a @ ArrayType(et, _) =>
    +            s"""
    +              // Remember the current cursor so that we can calculate how 
many bytes are
    +              // written later.
    +              final int $tmpCursor = $bufferHolder.cursor;
    +              ${writeArrayToBuffer(ctx, input.primitive, et, bufferHolder)}
    +              $rowWriter.setOffsetAndSize($index, $tmpCursor, 
$bufferHolder.cursor - $tmpCursor);
    +              $rowWriter.alignWords($bufferHolder.cursor - $tmpCursor);
    +            """
    +
    +          case m @ MapType(kt, vt, _) =>
    +            s"""
    +              // Remember the current cursor so that we can calculate how 
many bytes are
    +              // written later.
    +              final int $tmpCursor = $bufferHolder.cursor;
    +              ${writeMapToBuffer(ctx, input.primitive, kt, vt, 
bufferHolder)}
    +              $rowWriter.setOffsetAndSize($index, $tmpCursor, 
$bufferHolder.cursor - $tmpCursor);
    +              $rowWriter.alignWords($bufferHolder.cursor - $tmpCursor);
    +            """
    +
    +          case _ if ctx.isPrimitiveType(dt) =>
    +            val fieldOffset = ctx.freshName("fieldOffset")
    +            s"""
    +              final long $fieldOffset = $rowWriter.getFieldOffset($index);
    +              Platform.putLong($bufferHolder.buffer, $fieldOffset, 0L);
    +              ${writePrimitiveType(ctx, input.primitive, dt, 
s"$bufferHolder.buffer", fieldOffset)}
    +            """
    +
    +          case t: DecimalType if t.precision <= Decimal.MAX_LONG_DIGITS =>
    +            s"$rowWriter.writeCompactDecimal($index, ${input.primitive}, " 
+
    +              s"${t.precision}, ${t.scale});"
    +
    +          case t: DecimalType =>
    +            s"$rowWriter.write($index, ${input.primitive}, ${t.precision}, 
${t.scale});"
    +
    +          case NullType => ""
    +
    +          case _ => s"$rowWriter.write($index, ${input.primitive});"
    +        }
    +
    +        s"""
    +          ${input.code}
    +          if (${input.isNull}) {
    +            $setNull
    +          } else {
    +            $writeField
    +          }
    +        """
    +    }
    +
    +    s"""
    +      $rowWriter.initialize($bufferHolder, ${inputs.length});
    +      ${ctx.splitExpressions(row, writeFields)}
    +    """
    +  }
    +
    +  // todo: if the nullability of array element is correct, we can use it 
to save null check.
    +  private def writeArrayToBuffer(
    +      ctx: CodeGenContext,
    +      input: String,
    +      elementType: DataType,
    +      bufferHolder: String,
    +      needHeader: Boolean = true): String = {
    +    val arrayWriter = ctx.freshName("arrayWriter")
    +    ctx.addMutableState(arrayWriterClass, arrayWriter,
    +      s"this.$arrayWriter = new $arrayWriterClass();")
    +    val numElements = ctx.freshName("numElements")
    +    val index = ctx.freshName("index")
    +    val element = ctx.freshName("element")
    +
    +    val jt = ctx.javaType(elementType)
    +
    +    val fixedElementSize = elementType match {
    +      case t: DecimalType if t.precision <= Decimal.MAX_LONG_DIGITS => 8
    +      case _ if ctx.isPrimitiveType(jt) => elementType.defaultSize
    +      case _ => 0
    +    }
    +
    +    val writeElement = elementType match {
    +      case t: StructType =>
    +        s"""
    +          $arrayWriter.setOffset($index);
    +          ${writeStructToBuffer(ctx, element, t.map(_.dataType), 
bufferHolder)}
    +        """
    +
    +      case a @ ArrayType(et, _) =>
    +        s"""
    +          $arrayWriter.setOffset($index);
    +          ${writeArrayToBuffer(ctx, element, et, bufferHolder)}
    +        """
    +
    +      case m @ MapType(kt, vt, _) =>
    +        s"""
    +          $arrayWriter.setOffset($index);
    +          ${writeMapToBuffer(ctx, element, kt, vt, bufferHolder)}
    +        """
    +
    +      case _ if ctx.isPrimitiveType(elementType) =>
    +        // Should we do word align?
    +        val dataSize = elementType.defaultSize
    +
    +        s"""
    +          $arrayWriter.setOffset($index);
    +          ${writePrimitiveType(ctx, element, elementType,
    +            s"$bufferHolder.buffer", s"$bufferHolder.cursor")}
    +          $bufferHolder.cursor += $dataSize;
    +        """
    +
    +      case t: DecimalType if t.precision <= Decimal.MAX_LONG_DIGITS =>
    +        s"$arrayWriter.writeCompactDecimal($index, $element, 
${t.precision}, ${t.scale});"
    +
    +      case t: DecimalType =>
    +        s"$arrayWriter.write($index, $element, ${t.precision}, 
${t.scale});"
    +
    +      case NullType => ""
    +
    +      case _ => s"$arrayWriter.write($index, $element);"
    +    }
    +
    +    s"""
    +      if ($input instanceof UnsafeArrayData) {
    +        $arrayWriterClass.directWrite($bufferHolder, (UnsafeArrayData) 
$input, $needHeader);
    +      } else {
    +        final int $numElements = $input.numElements();
    +        $arrayWriter.initialize($bufferHolder, $numElements, $needHeader, 
$fixedElementSize);
    +
    +        for (int $index = 0; $index < $numElements; $index++) {
    +          if ($input.isNullAt($index)) {
    +            $arrayWriter.setNullAt($index);
    +          } else {
    +            final $jt $element = ${ctx.getValue(input, elementType, 
index)};
    +            $writeElement
    +          }
    +        }
    +      }
    +    """
    +  }
    +
    +  // todo: if the nullability of value element is correct, we can use it 
to save null check.
    +  private def writeMapToBuffer(
    +      ctx: CodeGenContext,
    +      input: String,
    +      keyType: DataType,
    +      valueType: DataType,
    +      bufferHolder: String): String = {
    +    val keys = ctx.freshName("keys")
    +    val values = ctx.freshName("values")
    +    val tmpCursor = ctx.freshName("tmpCursor")
    +
    +    s"""
    +      final ArrayData $keys = $input.keyArray();
    +      final ArrayData $values = $input.valueArray();
    +
    +      // Write the numElements into first 4 bytes.
    +      Platform.putInt($bufferHolder.buffer, $bufferHolder.cursor, 
$keys.numElements());
    +
    +      $bufferHolder.cursor += 8;
    +      // Remember the current cursor so that we can write numBytes of key 
array later.
    +      final int $tmpCursor = $bufferHolder.cursor;
    +
    +      ${writeArrayToBuffer(ctx, keys, keyType, bufferHolder, needHeader = 
false)}
    +      // Write the numBytes of key array into second 4 bytes.
    +      Platform.putInt($bufferHolder.buffer, $tmpCursor - 4, 
$bufferHolder.cursor - $tmpCursor);
    --- End diff --
    
    Could you document the format of UnsafeArrayData and UnsafeMapData 
somewhere?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to