Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5526#discussion_r28975672
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala ---
    @@ -197,3 +233,69 @@ trait InsertableRelation {
     trait CatalystScan {
       def buildScan(requiredColumns: Seq[Attribute], filters: 
Seq[Expression]): RDD[Row]
     }
    +
    +/**
    + * ::Experimental::
    + * [[OutputWriter]] is used together with [[FSBasedRelation]] for 
persisting rows to the
    + * underlying file system.  An [[OutputWriter]] instance is created when a 
new output file is
    + * opened.  This instance is used to persist rows to this single output 
file.
    + */
    +@Experimental
    +trait OutputWriter {
    +  /**
    +   * Persists a single row.  Invoked on the executor side.
    +   */
    +  def write(row: Row): Unit
    +
    +  /**
    +   * Closes the [[OutputWriter]]. Invoked on the executor side after all 
rows are persisted, before
    +   * the task output is committed.
    +   */
    +  def close(): Unit
    +}
    +
    +/**
    + * ::Experimental::
    + * A [[BaseRelation]] that abstracts file system based data sources.
    + *
    + * For the read path, similar to [[PrunedFilteredScan]], it can eliminate 
unneeded columns and
    + * filter using selected predicates before producing an RDD containing all 
matching tuples as
    + * [[Row]] objects.
    + *
    + * In addition, when reading from Hive style partitioned tables stored in 
file systems, it's able to
    + * discover partitioning information from the paths of input directories, 
and perform partition
    + * pruning before start reading the data.
    + *
    + * For the write path, it provides the ability to write to both 
non-partitioned and partitioned
    + * tables.  Directory layout of the partitioned tables is compatible with 
Hive.
    + */
    +@Experimental
    +trait FSBasedRelation extends BaseRelation {
    +  /**
    +   * Builds an `RDD[Row]` containing all rows within this relation.
    +   *
    +   * @param requiredColumns Required columns.
    +   * @param filters Candidate filters to be pushed down. The actual filter 
should be the conjunction
    +   *        of all `filters`.  The pushed down filters are currently 
purely an optimization as they
    +   *        will all be evaluated again. This means it is safe to use them 
with methods that produce
    +   *        false positives such as filtering partitions based on a bloom 
filter.
    +   * @param inputPaths Data files to be read. If the underlying relation 
is partitioned, only data
    +   *        files within required partition directories are included.
    +   */
    +  def buildScan(
    +      requiredColumns: Array[String],
    +      filters: Array[Filter],
    +      inputPaths: Array[String]): RDD[Row]
    +
    +  /**
    +   * When writing rows to this relation, this method is invoked on the 
driver side before the actual
    +   * write job is issued.  It provides an opportunity to configure the 
write job to be performed.
    +   */
    +  def prepareForWrite(conf: Configuration): Unit
    +
    +  /**
    +   * This method is responsible for producing a new [[OutputWriter]] for 
each newly opened output
    +   * file on the executor side.
    +   */
    +  def newOutputWriter(path: String): OutputWriter
    --- End diff --
    
    One issue here is about passing driver side Hadoop configuration to 
OutputWriters on executor side. Users may set properties to Hadoop 
configurations on driver side (e.g. 
`mapreduce.fileoutputcommitter.marksuccessfuljobs`), and we should inherit 
these settings on executor side when writing data. zero-arg constructor plus 
`init(...)` is a good way to avoid forcing `BaseRelation` to be serializable, 
but I guess we have to put `Configuration` as an argument of 
`OutputWriter.init(...)`. This makes the data sources API coupled with Hadoop 
API via `Configuration`, but I guess this should be more acceptable comparing 
to forcing `BaseRelation` subclasses to be serializable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to