Github user davies commented on a diff in the pull request:
https://github.com/apache/spark/pull/8747#discussion_r40961647
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
---
@@ -393,10 +393,278 @@ object GenerateUnsafeProjection extends
CodeGenerator[Seq[Expression], UnsafePro
case _ => input
}
+ val rowWriterClass = classOf[UnsafeRowWriter].getName
+ val arrayWriterClass = classOf[UnsafeArrayWriter].getName
+
+ // todo: if the nullability of field is correct, we can use it to save
null check.
+ private def writeStructToBuffer(
+ ctx: CodeGenContext,
+ input: String,
+ fieldTypes: Seq[DataType],
+ bufferHolder: String): String = {
+ val fieldEvals = fieldTypes.zipWithIndex.map { case (dt, i) =>
+ val fieldName = ctx.freshName("fieldName")
+ val code = s"final ${ctx.javaType(dt)} $fieldName =
${ctx.getValue(input, dt, i.toString)};"
+ val isNull = s"$input.isNullAt($i)"
+ GeneratedExpressionCode(code, isNull, fieldName)
+ }
+
+ s"""
+ if ($input instanceof UnsafeRow) {
+ $rowWriterClass.directWrite($bufferHolder, (UnsafeRow) $input);
+ } else {
+ ${writeExpressionsToBuffer(ctx, input, fieldEvals, fieldTypes,
bufferHolder)}
+ }
+ """
+ }
+
+ private def writeExpressionsToBuffer(
+ ctx: CodeGenContext,
+ row: String,
+ inputs: Seq[GeneratedExpressionCode],
+ inputTypes: Seq[DataType],
+ bufferHolder: String): String = {
+ val rowWriter = ctx.freshName("rowWriter")
+ ctx.addMutableState(rowWriterClass, rowWriter, s"this.$rowWriter = new
$rowWriterClass();")
+
+ val writeFields = inputs.zip(inputTypes).zipWithIndex.map {
+ case ((input, dt), index) =>
+ val tmpCursor = ctx.freshName("tmpCursor")
+
+ val setNull = dt match {
+ case t: DecimalType if t.precision > Decimal.MAX_LONG_DIGITS =>
+ // Can't call setNullAt() for DecimalType with precision
larger than 18.
+ s"$rowWriter.write($index, null, 0, 0);"
+ case _ => s"$rowWriter.setNullAt($index);"
+ }
+
+ val writeField = dt match {
+ case t: StructType =>
+ s"""
+ // Remember the current cursor so that we can calculate how
many bytes are
+ // written later.
+ final int $tmpCursor = $bufferHolder.cursor;
+ ${writeStructToBuffer(ctx, input.primitive,
t.map(_.dataType), bufferHolder)}
+ $rowWriter.setOffsetAndSize($index, $tmpCursor,
$bufferHolder.cursor - $tmpCursor);
+ """
+
+ case a @ ArrayType(et, _) =>
+ s"""
+ // Remember the current cursor so that we can calculate how
many bytes are
+ // written later.
+ final int $tmpCursor = $bufferHolder.cursor;
+ ${writeArrayToBuffer(ctx, input.primitive, et, bufferHolder)}
+ $rowWriter.setOffsetAndSize($index, $tmpCursor,
$bufferHolder.cursor - $tmpCursor);
+ $rowWriter.alignWords($bufferHolder.cursor - $tmpCursor);
+ """
+
+ case m @ MapType(kt, vt, _) =>
+ s"""
+ // Remember the current cursor so that we can calculate how
many bytes are
+ // written later.
+ final int $tmpCursor = $bufferHolder.cursor;
+ ${writeMapToBuffer(ctx, input.primitive, kt, vt,
bufferHolder)}
+ $rowWriter.setOffsetAndSize($index, $tmpCursor,
$bufferHolder.cursor - $tmpCursor);
+ $rowWriter.alignWords($bufferHolder.cursor - $tmpCursor);
+ """
+
+ case _ if ctx.isPrimitiveType(dt) =>
+ val fieldOffset = ctx.freshName("fieldOffset")
+ s"""
+ final long $fieldOffset = $rowWriter.getFieldOffset($index);
+ Platform.putLong($bufferHolder.buffer, $fieldOffset, 0L);
+ ${writePrimitiveType(ctx, input.primitive, dt,
s"$bufferHolder.buffer", fieldOffset)}
+ """
+
+ case t: DecimalType if t.precision <= Decimal.MAX_LONG_DIGITS =>
+ s"$rowWriter.writeCompactDecimal($index, ${input.primitive}, "
+
+ s"${t.precision}, ${t.scale});"
+
+ case t: DecimalType =>
+ s"$rowWriter.write($index, ${input.primitive}, ${t.precision},
${t.scale});"
+
+ case NullType => ""
+
+ case _ => s"$rowWriter.write($index, ${input.primitive});"
+ }
+
+ s"""
+ ${input.code}
+ if (${input.isNull}) {
+ $setNull
+ } else {
+ $writeField
+ }
+ """
+ }
+
+ s"""
+ $rowWriter.initialize($bufferHolder, ${inputs.length});
+ ${ctx.splitExpressions(row, writeFields)}
+ """
+ }
+
+ // todo: if the nullability of array element is correct, we can use it
to save null check.
+ private def writeArrayToBuffer(
+ ctx: CodeGenContext,
+ input: String,
+ elementType: DataType,
+ bufferHolder: String,
+ needHeader: Boolean = true): String = {
+ val arrayWriter = ctx.freshName("arrayWriter")
+ ctx.addMutableState(arrayWriterClass, arrayWriter,
+ s"this.$arrayWriter = new $arrayWriterClass();")
+ val numElements = ctx.freshName("numElements")
+ val index = ctx.freshName("index")
+ val element = ctx.freshName("element")
+
+ val jt = ctx.javaType(elementType)
+
+ val fixedElementSize = elementType match {
+ case t: DecimalType if t.precision <= Decimal.MAX_LONG_DIGITS => 8
+ case _ if ctx.isPrimitiveType(jt) => elementType.defaultSize
+ case _ => 0
+ }
+
+ val writeElement = elementType match {
+ case t: StructType =>
+ s"""
+ $arrayWriter.setOffset($index);
+ ${writeStructToBuffer(ctx, element, t.map(_.dataType),
bufferHolder)}
+ """
+
+ case a @ ArrayType(et, _) =>
+ s"""
+ $arrayWriter.setOffset($index);
+ ${writeArrayToBuffer(ctx, element, et, bufferHolder)}
+ """
+
+ case m @ MapType(kt, vt, _) =>
+ s"""
+ $arrayWriter.setOffset($index);
+ ${writeMapToBuffer(ctx, element, kt, vt, bufferHolder)}
+ """
+
+ case _ if ctx.isPrimitiveType(elementType) =>
+ // Should we do word align?
+ val dataSize = elementType.defaultSize
+
+ s"""
+ $arrayWriter.setOffset($index);
+ ${writePrimitiveType(ctx, element, elementType,
+ s"$bufferHolder.buffer", s"$bufferHolder.cursor")}
+ $bufferHolder.cursor += $dataSize;
+ """
+
+ case t: DecimalType if t.precision <= Decimal.MAX_LONG_DIGITS =>
+ s"$arrayWriter.writeCompactDecimal($index, $element,
${t.precision}, ${t.scale});"
+
+ case t: DecimalType =>
+ s"$arrayWriter.write($index, $element, ${t.precision},
${t.scale});"
+
+ case NullType => ""
+
+ case _ => s"$arrayWriter.write($index, $element);"
+ }
+
+ s"""
+ if ($input instanceof UnsafeArrayData) {
+ $arrayWriterClass.directWrite($bufferHolder, (UnsafeArrayData)
$input, $needHeader);
+ } else {
+ final int $numElements = $input.numElements();
+ $arrayWriter.initialize($bufferHolder, $numElements, $needHeader,
$fixedElementSize);
+
+ for (int $index = 0; $index < $numElements; $index++) {
+ if ($input.isNullAt($index)) {
+ $arrayWriter.setNullAt($index);
+ } else {
+ final $jt $element = ${ctx.getValue(input, elementType,
index)};
+ $writeElement
+ }
+ }
+ }
+ """
+ }
+
+ // todo: if the nullability of value element is correct, we can use it
to save null check.
+ private def writeMapToBuffer(
+ ctx: CodeGenContext,
+ input: String,
+ keyType: DataType,
+ valueType: DataType,
+ bufferHolder: String): String = {
+ val keys = ctx.freshName("keys")
+ val values = ctx.freshName("values")
+ val tmpCursor = ctx.freshName("tmpCursor")
+
+ s"""
+ final ArrayData $keys = $input.keyArray();
+ final ArrayData $values = $input.valueArray();
+
+ // Write the numElements into first 4 bytes.
+ Platform.putInt($bufferHolder.buffer, $bufferHolder.cursor,
$keys.numElements());
--- End diff --
Should we call grow first and zero out the unused 4 bytes?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]