Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/360#discussion_r13876519
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala ---
    @@ -0,0 +1,667 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.parquet
    +
    +import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap}
    +
    +import parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, 
Converter}
    +import parquet.schema.MessageType
    +
    +import org.apache.spark.sql.catalyst.types._
    +import org.apache.spark.sql.catalyst.expressions.{GenericRow, Row, 
Attribute}
    +import org.apache.spark.sql.parquet.CatalystConverter.FieldType
    +
    +/**
    + * Collection of converters of Parquet types (group and primitive types) 
that
    + * model arrays and maps. The conversions are partly based on the 
AvroParquet
    + * converters that are part of Parquet in order to be able to process these
    + * types.
    + *
    + * There are several types of converters:
    + * <ul>
    + *   <li>[[org.apache.spark.sql.parquet.CatalystPrimitiveConverter]] for 
primitive
    + *   (numeric, boolean and String) types</li>
    + *   <li>[[org.apache.spark.sql.parquet.CatalystNativeArrayConverter]] for 
arrays
    + *   of native JVM element types; note: currently null values are not 
supported!</li>
    + *   <li>[[org.apache.spark.sql.parquet.CatalystArrayConverter]] for 
arrays of
    + *   arbitrary element types (including nested element types); note: 
currently
    + *   null values are not supported!</li>
    + *   <li>[[org.apache.spark.sql.parquet.CatalystStructConverter]] for 
structs</li>
    + *   <li>[[org.apache.spark.sql.parquet.CatalystMapConverter]] for maps; 
note:
    + *   currently null values are not supported!</li>
    + *   <li>[[org.apache.spark.sql.parquet.CatalystPrimitiveRowConverter]] 
for rows
    + *   of only primitive element types</li>
    + *   <li>[[org.apache.spark.sql.parquet.CatalystGroupConverter]] for other 
nested
    + *   records, including the top-level row record</li>
    + * </ul>
    + */
    +
    +private[sql] object CatalystConverter {
    +  // The type internally used for fields
    +  type FieldType = StructField
    +
    +  // This is mostly Parquet convention (see, e.g., `ConversionPatterns`).
    +  // Note that "array" for the array elements is chosen by ParquetAvro.
    +  // Using a different value will result in Parquet silently dropping 
columns.
    +  val ARRAY_ELEMENTS_SCHEMA_NAME = "array"
    +  val MAP_KEY_SCHEMA_NAME = "key"
    +  val MAP_VALUE_SCHEMA_NAME = "value"
    +  val MAP_SCHEMA_NAME = "map"
    +
    +  // TODO: consider using Array[T] for arrays to avoid boxing of primitive 
types
    +  type ArrayScalaType[T] = Seq[T]
    +  type StructScalaType[T] = Seq[T]
    +  type MapScalaType[K, V] = Map[K, V]
    +
    +  protected[parquet] def createConverter(
    +      field: FieldType,
    +      fieldIndex: Int,
    +      parent: CatalystConverter): Converter = {
    +    val fieldType: DataType = field.dataType
    +    fieldType match {
    +      // For native JVM types we use a converter with native arrays
    +      case ArrayType(elementType: NativeType) => {
    +        new CatalystNativeArrayConverter(elementType, fieldIndex, parent)
    +      }
    +      // This is for other types of arrays, including those with nested 
fields
    +      case ArrayType(elementType: DataType) => {
    +        new CatalystArrayConverter(elementType, fieldIndex, parent)
    +      }
    +      case StructType(fields: Seq[StructField]) => {
    +        new CatalystStructConverter(fields, fieldIndex, parent)
    +      }
    +      case MapType(keyType: DataType, valueType: DataType) => {
    +        new CatalystMapConverter(
    +          Seq(
    +            new FieldType(MAP_KEY_SCHEMA_NAME, keyType, false),
    +            new FieldType(MAP_VALUE_SCHEMA_NAME, valueType, true)),
    +            fieldIndex,
    +            parent)
    +      }
    +      // Strings, Shorts and Bytes do not have a corresponding type in 
Parquet
    +      // so we need to treat them separately
    +      case StringType => {
    +        new CatalystPrimitiveConverter(parent, fieldIndex) {
    +          override def addBinary(value: Binary): Unit =
    +            parent.updateString(fieldIndex, value)
    +        }
    +      }
    +      case ShortType => {
    +        new CatalystPrimitiveConverter(parent, fieldIndex) {
    +          override def addInt(value: Int): Unit =
    +            parent.updateShort(fieldIndex, 
value.asInstanceOf[ShortType.JvmType])
    +        }
    +      }
    +      case ByteType => {
    +        new CatalystPrimitiveConverter(parent, fieldIndex) {
    +          override def addInt(value: Int): Unit =
    +            parent.updateByte(fieldIndex, 
value.asInstanceOf[ByteType.JvmType])
    +        }
    +      }
    +      // All other primitive types use the default converter
    +      case ctype: NativeType => { // note: need the type tag here!
    +        new CatalystPrimitiveConverter(parent, fieldIndex)
    +      }
    +      case _ => throw new RuntimeException(
    +        s"unable to convert datatype ${field.dataType.toString} in 
CatalystConverter")
    +    }
    +  }
    +
    +  protected[parquet] def createRootConverter(
    +      parquetSchema: MessageType,
    +      attributes: Seq[Attribute]): CatalystConverter = {
    +    // For non-nested types we use the optimized Row converter
    +    if (attributes.forall(a => 
ParquetTypesConverter.isPrimitiveType(a.dataType))) {
    +      new CatalystPrimitiveRowConverter(attributes)
    +    } else {
    +      new CatalystGroupConverter(attributes)
    +    }
    +  }
    +}
    +
    +private[parquet] trait CatalystConverter {
    +  /**
    +   * The number of fields this group has
    +   */
    +  protected[parquet] val size: Int
    +
    +  /**
    +   * The index of this converter in the parent
    +   */
    +  protected[parquet] val index: Int
    +
    +  /**
    +   * The parent converter
    +   */
    +  protected[parquet] val parent: CatalystConverter
    +
    +  /**
    +   * Called by child converters to update their value in its parent (this).
    +   * Note that if possible the more specific update methods below should 
be used
    +   * to avoid auto-boxing of native JVM types.
    +   *
    +   * @param fieldIndex
    +   * @param value
    +   */
    +  protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit
    +
    +  protected[parquet] def updateBoolean(fieldIndex: Int, value: Boolean): 
Unit =
    +    updateField(fieldIndex, value)
    +
    +  protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit =
    +    updateField(fieldIndex, value)
    +
    +  protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit =
    +    updateField(fieldIndex, value)
    +
    +  protected[parquet] def updateShort(fieldIndex: Int, value: Short): Unit =
    +    updateField(fieldIndex, value)
    +
    +  protected[parquet] def updateByte(fieldIndex: Int, value: Byte): Unit =
    +    updateField(fieldIndex, value)
    +
    +  protected[parquet] def updateDouble(fieldIndex: Int, value: Double): 
Unit =
    +    updateField(fieldIndex, value)
    +
    +  protected[parquet] def updateFloat(fieldIndex: Int, value: Float): Unit =
    +    updateField(fieldIndex, value)
    +
    +  protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): 
Unit =
    +    updateField(fieldIndex, value.getBytes)
    +
    +  protected[parquet] def updateString(fieldIndex: Int, value: Binary): 
Unit =
    +    updateField(fieldIndex, value.toStringUsingUTF8)
    +
    +  protected[parquet] def isRootConverter: Boolean = parent == null
    +
    +  protected[parquet] def clearBuffer(): Unit
    +
    +  /**
    +   * Should only be called in the root (group) converter!
    +   *
    +   * @return
    +   */
    +  def getCurrentRecord: Row = throw new UnsupportedOperationException
    +}
    +
    +/**
    + * A `parquet.io.api.GroupConverter` that is able to convert a Parquet 
record
    + * to a [[org.apache.spark.sql.catalyst.expressions.Row]] object.
    + *
    + * @param schema The corresponding Catalyst schema in the form of a list 
of attributes.
    + */
    +private[parquet] class CatalystGroupConverter(
    +    protected[parquet] val schema: Seq[FieldType],
    +    protected[parquet] val index: Int,
    +    protected[parquet] val parent: CatalystConverter,
    +    protected[parquet] var current: ArrayBuffer[Any],
    +    protected[parquet] var buffer: ArrayBuffer[Row])
    +  extends GroupConverter with CatalystConverter {
    +
    +  def this(schema: Seq[FieldType], index: Int, parent: CatalystConverter) =
    +    this(
    +      schema,
    +      index,
    +      parent,
    +      current=null,
    +      buffer=new ArrayBuffer[Row](
    +        CatalystArrayConverter.INITIAL_ARRAY_SIZE))
    +
    +  /**
    +   * This constructor is used for the root converter only!
    +   */
    +  def this(attributes: Seq[Attribute]) =
    +    this(attributes.map(a => new FieldType(a.name, a.dataType, 
a.nullable)), 0, null)
    +
    +  protected [parquet] val converters: Array[Converter] =
    +    schema.map(field =>
    +      CatalystConverter.createConverter(field, schema.indexOf(field), 
this))
    +    .toArray
    +
    +  override val size = schema.size
    +
    +  override def getCurrentRecord: Row = {
    +    assert(isRootConverter, "getCurrentRecord should only be called in 
root group converter!")
    +    // TODO: use iterators if possible
    +    // Note: this will ever only be called in the root converter when the 
record has been
    +    // fully processed. Therefore it will be difficult to use mutable rows 
instead, since
    +    // any non-root converter never would be sure when it would be safe to 
re-use the buffer.
    +    new GenericRow(current.toArray)
    +  }
    +
    +  override def getConverter(fieldIndex: Int): Converter = 
converters(fieldIndex)
    +
    +  // for child converters to update upstream values
    +  override protected[parquet] def updateField(fieldIndex: Int, value: 
Any): Unit = {
    +    current.update(fieldIndex, value)
    +  }
    +
    +  override protected[parquet] def clearBuffer(): Unit = buffer.clear()
    +
    +  override def start(): Unit = {
    +    current = ArrayBuffer.fill(schema.length)(null)
    +    converters.foreach {
    +      converter => if (!converter.isPrimitive) {
    +        converter.asInstanceOf[CatalystConverter].clearBuffer
    +      }
    +    }
    +  }
    +
    +  override def end(): Unit = {
    +    if (!isRootConverter) {
    +      assert(current!=null) // there should be no empty groups
    +      buffer.append(new GenericRow(current.toArray))
    +      parent.updateField(index, new 
GenericRow(buffer.toArray.asInstanceOf[Array[Any]]))
    +    }
    +  }
    +}
    +
    +/**
    + * A `parquet.io.api.GroupConverter` that is able to convert a Parquet 
record
    + * to a [[org.apache.spark.sql.catalyst.expressions.Row]] object. Note 
that his
    + * converter is optimized for rows of primitive types (non-nested records).
    + */
    +private[parquet] class CatalystPrimitiveRowConverter(
    +    protected[parquet] val schema: Seq[FieldType],
    +    protected[parquet] var current: ParquetRelation.RowType)
    +  extends GroupConverter with CatalystConverter {
    +
    +  // This constructor is used for the root converter only
    +  def this(attributes: Seq[Attribute]) =
    +    this(
    +      attributes.map(a => new FieldType(a.name, a.dataType, a.nullable)),
    +      new ParquetRelation.RowType(attributes.length))
    +
    +  protected [parquet] val converters: Array[Converter] =
    +    schema.map(field =>
    +      CatalystConverter.createConverter(field, schema.indexOf(field), 
this))
    +      .toArray
    +
    +  override val size = schema.size
    +
    +  override val index = 0
    +
    +  override val parent = null
    +
    +  // Should be only called in root group converter!
    +  override def getCurrentRecord: ParquetRelation.RowType = current
    +
    +  override def getConverter(fieldIndex: Int): Converter = 
converters(fieldIndex)
    +
    +  // for child converters to update upstream values
    +  override protected[parquet] def updateField(fieldIndex: Int, value: 
Any): Unit = {
    +    throw new UnsupportedOperationException // child converters should use 
the
    +    // specific update methods below
    +  }
    +
    +  override protected[parquet] def clearBuffer(): Unit = {}
    +
    +  override def start(): Unit = {
    +    var i = 0
    +    while (i < schema.length) {
    --- End diff --
    
    schema.length slows things down substantially in our profiling. should just 
put it in a variable


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to