Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19651#discussion_r150396893 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala --- @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.io._ +import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp} +import org.apache.orc.storage.common.`type`.HiveDecimal +import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable} + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.execution.datasources.orc.OrcUtils.getTypeDescription +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +private[orc] class OrcSerializer(dataSchema: StructType) { + + private[this] lazy val orcStruct: OrcStruct = + createOrcValue(dataSchema).asInstanceOf[OrcStruct] + + private[this] val writableWrappers = + dataSchema.fields.map(f => getWritableWrapper(f.dataType)) + + def serialize(row: InternalRow): OrcStruct = { + convertInternalRowToOrcStruct(row, dataSchema, Some(writableWrappers), Some(orcStruct)) + } + + /** + * Return a Orc value object for the given Spark schema. + */ + private[this] def createOrcValue(dataType: DataType) = + OrcStruct.createValue(getTypeDescription(dataType)) + + /** + * Convert Apache Spark InternalRow to Apache ORC OrcStruct. + */ + private[this] def convertInternalRowToOrcStruct( + row: InternalRow, + schema: StructType, + valueWrappers: Option[Seq[Any => Any]] = None, + struct: Option[OrcStruct] = None): OrcStruct = { + val wrappers = + valueWrappers.getOrElse(schema.fields.map(_.dataType).map(getWritableWrapper).toSeq) + val orcStruct = struct.getOrElse(createOrcValue(schema).asInstanceOf[OrcStruct]) + + for (schemaIndex <- 0 until schema.length) { + val fieldType = schema(schemaIndex).dataType + if (row.isNullAt(schemaIndex)) { + orcStruct.setFieldValue(schemaIndex, null) + } else { + val field = row.get(schemaIndex, fieldType) + val fieldValue = wrappers(schemaIndex)(field).asInstanceOf[WritableComparable[_]] + orcStruct.setFieldValue(schemaIndex, fieldValue) + } + } + orcStruct + } + + private[this] def withNullSafe(f: Any => Any): Any => Any = { + input => if (input == null) null else f(input) + } + + /** + * Builds a WritableComparable-return function ahead of time according to DataType + * to avoid pattern matching and branching costs per row. + */ + private[this] def getWritableWrapper(dataType: DataType): Any => Any = dataType match { + case NullType => _ => null + + case BooleanType => withNullSafe(o => new BooleanWritable(o.asInstanceOf[Boolean])) + + case ByteType => withNullSafe(o => new ByteWritable(o.asInstanceOf[Byte])) --- End diff -- Yep. I'll try to avoid boxing.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org