Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/360#discussion_r13892425
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala ---
    @@ -0,0 +1,667 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.parquet
    +
    +import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap}
    +
    +import parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, 
Converter}
    +import parquet.schema.MessageType
    +
    +import org.apache.spark.sql.catalyst.types._
    +import org.apache.spark.sql.catalyst.expressions.{GenericRow, Row, 
Attribute}
    +import org.apache.spark.sql.parquet.CatalystConverter.FieldType
    +
    +/**
    + * Collection of converters of Parquet types (group and primitive types) 
that
    + * model arrays and maps. The conversions are partly based on the 
AvroParquet
    + * converters that are part of Parquet in order to be able to process these
    + * types.
    + *
    + * There are several types of converters:
    + * <ul>
    + *   <li>[[org.apache.spark.sql.parquet.CatalystPrimitiveConverter]] for 
primitive
    + *   (numeric, boolean and String) types</li>
    + *   <li>[[org.apache.spark.sql.parquet.CatalystNativeArrayConverter]] for 
arrays
    + *   of native JVM element types; note: currently null values are not 
supported!</li>
    + *   <li>[[org.apache.spark.sql.parquet.CatalystArrayConverter]] for 
arrays of
    + *   arbitrary element types (including nested element types); note: 
currently
    + *   null values are not supported!</li>
    + *   <li>[[org.apache.spark.sql.parquet.CatalystStructConverter]] for 
structs</li>
    + *   <li>[[org.apache.spark.sql.parquet.CatalystMapConverter]] for maps; 
note:
    + *   currently null values are not supported!</li>
    + *   <li>[[org.apache.spark.sql.parquet.CatalystPrimitiveRowConverter]] 
for rows
    + *   of only primitive element types</li>
    + *   <li>[[org.apache.spark.sql.parquet.CatalystGroupConverter]] for other 
nested
    + *   records, including the top-level row record</li>
    + * </ul>
    + */
    +
    +private[sql] object CatalystConverter {
    +  // The type internally used for fields
    +  type FieldType = StructField
    +
    +  // This is mostly Parquet convention (see, e.g., `ConversionPatterns`).
    +  // Note that "array" for the array elements is chosen by ParquetAvro.
    +  // Using a different value will result in Parquet silently dropping 
columns.
    +  val ARRAY_ELEMENTS_SCHEMA_NAME = "array"
    +  val MAP_KEY_SCHEMA_NAME = "key"
    +  val MAP_VALUE_SCHEMA_NAME = "value"
    +  val MAP_SCHEMA_NAME = "map"
    +
    +  // TODO: consider using Array[T] for arrays to avoid boxing of primitive 
types
    +  type ArrayScalaType[T] = Seq[T]
    +  type StructScalaType[T] = Seq[T]
    +  type MapScalaType[K, V] = Map[K, V]
    +
    +  protected[parquet] def createConverter(
    +      field: FieldType,
    +      fieldIndex: Int,
    +      parent: CatalystConverter): Converter = {
    +    val fieldType: DataType = field.dataType
    +    fieldType match {
    +      // For native JVM types we use a converter with native arrays
    +      case ArrayType(elementType: NativeType) => {
    +        new CatalystNativeArrayConverter(elementType, fieldIndex, parent)
    +      }
    +      // This is for other types of arrays, including those with nested 
fields
    +      case ArrayType(elementType: DataType) => {
    +        new CatalystArrayConverter(elementType, fieldIndex, parent)
    +      }
    +      case StructType(fields: Seq[StructField]) => {
    +        new CatalystStructConverter(fields, fieldIndex, parent)
    +      }
    +      case MapType(keyType: DataType, valueType: DataType) => {
    +        new CatalystMapConverter(
    +          Seq(
    +            new FieldType(MAP_KEY_SCHEMA_NAME, keyType, false),
    +            new FieldType(MAP_VALUE_SCHEMA_NAME, valueType, true)),
    +            fieldIndex,
    +            parent)
    +      }
    +      // Strings, Shorts and Bytes do not have a corresponding type in 
Parquet
    +      // so we need to treat them separately
    +      case StringType => {
    +        new CatalystPrimitiveConverter(parent, fieldIndex) {
    +          override def addBinary(value: Binary): Unit =
    +            parent.updateString(fieldIndex, value)
    +        }
    +      }
    +      case ShortType => {
    +        new CatalystPrimitiveConverter(parent, fieldIndex) {
    +          override def addInt(value: Int): Unit =
    +            parent.updateShort(fieldIndex, 
value.asInstanceOf[ShortType.JvmType])
    +        }
    +      }
    +      case ByteType => {
    +        new CatalystPrimitiveConverter(parent, fieldIndex) {
    +          override def addInt(value: Int): Unit =
    +            parent.updateByte(fieldIndex, 
value.asInstanceOf[ByteType.JvmType])
    +        }
    +      }
    +      // All other primitive types use the default converter
    +      case ctype: NativeType => { // note: need the type tag here!
    +        new CatalystPrimitiveConverter(parent, fieldIndex)
    +      }
    +      case _ => throw new RuntimeException(
    +        s"unable to convert datatype ${field.dataType.toString} in 
CatalystConverter")
    +    }
    +  }
    +
    +  protected[parquet] def createRootConverter(
    +      parquetSchema: MessageType,
    +      attributes: Seq[Attribute]): CatalystConverter = {
    +    // For non-nested types we use the optimized Row converter
    +    if (attributes.forall(a => 
ParquetTypesConverter.isPrimitiveType(a.dataType))) {
    +      new CatalystPrimitiveRowConverter(attributes)
    +    } else {
    +      new CatalystGroupConverter(attributes)
    +    }
    +  }
    +}
    +
    +private[parquet] trait CatalystConverter {
    --- End diff --
    
    The abstraction overhead of a trait is nontrivially high, and also 
seemingly unnecessary here. Please make this an abstract class that extends 
GroupConverter. I achieved around an 8-10% performance improvement in a 
benchmark that reads several billion Longs from a particular column with this 
change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to