Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5526#discussion_r30005313
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala ---
    @@ -207,3 +250,235 @@ trait InsertableRelation {
     trait CatalystScan {
       def buildScan(requiredColumns: Seq[Attribute], filters: 
Seq[Expression]): RDD[Row]
     }
    +
    +/**
    + * ::Experimental::
    + * [[OutputWriter]] is used together with [[FSBasedRelation]] for 
persisting rows to the
    + * underlying file system.  Subclasses of [[OutputWriter]] must provide a 
zero-argument constructor.
    + * An [[OutputWriter]] instance is created and initialized when a new 
output file is opened on
    + * executor side.  This instance is used to persist rows to this single 
output file.
    + */
    +@Experimental
    +abstract class OutputWriter {
    +  /**
    +   * Initializes this [[OutputWriter]] before any rows are persisted.
    +   *
    +   * @param path Path of the file to which this [[OutputWriter]] is 
supposed to write.  Note that
    +   *        this may not point to the final output file.  For example, 
`FileOutputFormat` writes to
    +   *        temporary directories and then merge written files back to the 
final destination.  In
    +   *        this case, `path` points to a temporary output file under the 
temporary directory.
    +   * @param dataSchema Schema of the rows to be written. Partition columns 
are not included in the
    +   *        schema if the corresponding relation is partitioned.
    +   * @param context The Hadoop MapReduce task context.
    +   */
    +  def init(
    +      path: String,
    +      dataSchema: StructType,
    +      context: TaskAttemptContext): Unit = ()
    +
    +  /**
    +   * Persists a single row.  Invoked on the executor side.  When writing 
to dynamically partitioned
    +   * tables, dynamic partition columns are not included in rows to be 
written.
    +   */
    +  def write(row: Row): Unit
    +
    +  /**
    +   * Closes the [[OutputWriter]]. Invoked on the executor side after all 
rows are persisted, before
    +   * the task output is committed.
    +   */
    +  def close(): Unit
    +}
    +
    +/**
    + * ::Experimental::
    + * A [[BaseRelation]] that provides much of the common code required for 
formats that store their
    + * data to an HDFS compatible filesystem.
    + *
    + * For the read path, similar to [[PrunedFilteredScan]], it can eliminate 
unneeded columns and
    + * filter using selected predicates before producing an RDD containing all 
matching tuples as
    + * [[Row]] objects. In addition, when reading from Hive style partitioned 
tables stored in file
    + * systems, it's able to discover partitioning information from the paths 
of input directories, and
    + * perform partition pruning before start reading the data. Subclasses of 
[[FSBasedRelation()]] must
    + * override one of the three `buildScan` methods to implement the read 
path.
    + *
    + * For the write path, it provides the ability to write to both 
non-partitioned and partitioned
    + * tables.  Directory layout of the partitioned tables is compatible with 
Hive.
    + *
    + * @constructor This constructor is for internal uses only. The 
[[PartitionSpec]] argument is for
    + *              implementing metastore table conversion.
    + * @param paths Base paths of this relation.  For partitioned relations, 
it should be the root
    + *        directories of all partition directories.
    + * @param maybePartitionSpec An [[FSBasedRelation]] can be created with an 
optional
    + *        [[PartitionSpec]], so that partition discovery can be skipped.
    + */
    +@Experimental
    +abstract class FSBasedRelation private[sql](
    +    val paths: Array[String],
    +    maybePartitionSpec: Option[PartitionSpec])
    +  extends BaseRelation {
    +
    +  /**
    +   * Constructs an [[FSBasedRelation]].
    +   *
    +   * @param paths Base paths of this relation.  For partitioned relations, 
it should be either root
    +   *        directories of all partition directories.
    +   * @param partitionColumns Partition columns of this relation.
    +   */
    +  def this(paths: Array[String], partitionColumns: StructType) =
    +    this(paths, {
    +      if (partitionColumns.isEmpty) None
    +      else Some(PartitionSpec(partitionColumns, Array.empty[Partition]))
    +    })
    +
    +  /**
    +   * Constructs an [[FSBasedRelation]].
    +   *
    +   * @param paths Base paths of this relation.  For partitioned relations, 
it should be root
    +   *        directories of all partition directories.
    +   */
    +  def this(paths: Array[String]) = this(paths, None)
    +
    +  private val hadoopConf = new 
Configuration(sqlContext.sparkContext.hadoopConfiguration)
    +
    +  private val codegenEnabled = sqlContext.conf.codegenEnabled
    +
    +  private var _partitionSpec: PartitionSpec = maybePartitionSpec.map { 
spec =>
    +    spec.copy(partitionColumns = spec.partitionColumns.asNullable)
    +  }.getOrElse {
    +    if (sqlContext.conf.partitionDiscoveryEnabled()) {
    +      discoverPartitions()
    +    } else {
    +      PartitionSpec(StructType(Nil), Array.empty[Partition])
    +    }
    +  }
    +
    +  private[sql] def partitionSpec: PartitionSpec = _partitionSpec
    +
    +  /**
    +   * Partition columns. Note that they are always nullable.
    +   */
    +  def partitionColumns: StructType = partitionSpec.partitionColumns
    +
    +  private[sql] def refresh(): Unit = {
    +    if (sqlContext.conf.partitionDiscoveryEnabled()) {
    +      _partitionSpec = discoverPartitions()
    +    }
    +  }
    +
    +  private def discoverPartitions(): PartitionSpec = {
    +    val basePaths = paths.map(new Path(_))
    +    val leafDirs = basePaths.flatMap { path =>
    +      val fs = path.getFileSystem(hadoopConf)
    +      Try(fs.getFileStatus(path.makeQualified(fs.getUri, 
fs.getWorkingDirectory)))
    +        .filter(_.isDir)
    +        .map(SparkHadoopUtil.get.listLeafDirStatuses(fs, _))
    +        .getOrElse(Seq.empty[FileStatus])
    +    }.map(_.getPath)
    +
    +    if (leafDirs.nonEmpty) {
    +      PartitioningUtils.parsePartitions(leafDirs, 
"__HIVE_DEFAULT_PARTITION__")
    +    } else {
    +      PartitionSpec(StructType(Array.empty[StructField]), 
Array.empty[Partition])
    +    }
    +  }
    +
    +  /**
    +   * Schema of this relation.  It consists of columns appearing in 
[[dataSchema]] and all partition
    +   * columns not appearing in [[dataSchema]].
    +   */
    +  override lazy val schema: StructType = {
    +    val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet
    +    StructType(dataSchema ++ partitionSpec.partitionColumns.filterNot { 
column =>
    +      dataSchemaColumnNames.contains(column.name.toLowerCase)
    +    })
    +  }
    +
    +  /**
    +   * Specifies schema of actual data files.  For partitioned relations, if 
one or more partitioned
    +   * columns are contained in the data files, they should also appear in 
`dataSchema`.
    +   */
    +  def dataSchema: StructType
    +
    +  /**
    +   * For a non-partitioned relation, this method builds an `RDD[Row]` 
containing all rows within
    +   * this relation. For partitioned relations, this method is called for 
each selected partition,
    +   * and builds an `RDD[Row]` containing all rows within that single 
partition.
    +   *
    +   * @param inputPaths For a non-partitioned relation, it contains paths 
of all data files in the
    +   *        relation. For a partitioned relation, it contains paths of all 
data files in a single
    +   *        selected partition.
    +   */
    +  def buildScan(inputPaths: Array[String]): RDD[Row] = {
    +    throw new RuntimeException(
    +      "At least one buildScan() method should be overridden to read the 
relation.")
    +  }
    +
    +  /**
    +   * For a non-partitioned relation, this method builds an `RDD[Row]` 
containing all rows within
    +   * this relation. For partitioned relations, this method is called for 
each selected partition,
    +   * and builds an `RDD[Row]` containing all rows within that single 
partition.
    +   *
    +   * @param requiredColumns Required columns.
    +   * @param inputPaths For a non-partitioned relation, it contains paths 
of all data files in the
    +   *        relation. For a partitioned relation, it contains paths of all 
data files in a single
    +   *        selected partition.
    +   */
    +  def buildScan(requiredColumns: Array[String], inputPaths: 
Array[String]): RDD[Row] = {
    +    // Yeah, to workaround serialization...
    +    val dataSchema = this.dataSchema
    +    val codegenEnabled = this.codegenEnabled
    +
    +    val requiredOutput = requiredColumns.map { col =>
    +      val field = dataSchema(col)
    +      BoundReference(dataSchema.fieldIndex(col), field.dataType, 
field.nullable)
    +    }.toSeq
    +
    +    val buildProjection = if (codegenEnabled) {
    +      GenerateMutableProjection.generate(requiredOutput, 
dataSchema.toAttributes)
    --- End diff --
    
    This needs to be done inside of `mapPartitions` as generated code is not 
serializable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to