Github user liancheng commented on a diff in the pull request:
https://github.com/apache/spark/pull/5526#discussion_r29946369
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala ---
@@ -207,3 +247,203 @@ trait InsertableRelation {
trait CatalystScan {
def buildScan(requiredColumns: Seq[Attribute], filters:
Seq[Expression]): RDD[Row]
}
+
+/**
+ * ::Experimental::
+ * [[OutputWriter]] is used together with [[FSBasedRelation]] for
persisting rows to the
+ * underlying file system. Subclasses of [[OutputWriter]] must provide a
zero-argument constructor.
+ * An [[OutputWriter]] instance is created and initialized when a new
output file is opened on
+ * executor side. This instance is used to persist rows to this single
output file.
+ */
+@Experimental
+abstract class OutputWriter {
+ /**
+ * Initializes this [[OutputWriter]] before any rows are persisted.
+ *
+ * @param path Path of the file to which this [[OutputWriter]] is
supposed to write. Note that
+ * this may not point to the final output file. For example,
`FileOutputFormat` writes to
+ * temporary directories and then merge written files back to the
final destination. In
+ * this case, `path` points to a temporary output file under the
temporary directory.
+ * @param dataSchema Schema of the rows to be written. Partition columns
are not included in the
+ * schema if the corresponding relation is partitioned.
+ * @param context The Hadoop MapReduce task context.
+ */
+ def init(
+ path: String,
+ dataSchema: StructType,
+ context: TaskAttemptContext): Unit = ()
+
+ /**
+ * Persists a single row. Invoked on the executor side. When writing
to dynamically partitioned
+ * tables, dynamic partition columns are not included in rows to be
written.
+ */
+ def write(row: Row): Unit
+
+ /**
+ * Closes the [[OutputWriter]]. Invoked on the executor side after all
rows are persisted, before
+ * the task output is committed.
+ */
+ def close(): Unit = ()
+}
+
+/**
+ * ::Experimental::
+ * A [[BaseRelation]] that abstracts file system based data sources.
+ *
+ * For the read path, similar to [[PrunedFilteredScan]], it can eliminate
unneeded columns and
+ * filter using selected predicates before producing an RDD containing all
matching tuples as
+ * [[Row]] objects. In addition, when reading from Hive style partitioned
tables stored in file
+ * systems, it's able to discover partitioning information from the paths
of input directories, and
+ * perform partition pruning before start reading the data.Subclasses of
[[FSBasedRelation()]] must
+ * override one of the three `buildScan` methods to implement the read
path.
+ *
+ * For the write path, it provides the ability to write to both
non-partitioned and partitioned
+ * tables. Directory layout of the partitioned tables is compatible with
Hive.
+ *
+ * @constructor This constructor is for internal uses only. The
[[PartitionSpec]] argument is for
+ * implementing metastore table conversion.
+ * @param paths Base paths of this relation. For partitioned relations,
it should be either root
+ * directories of all partition directories.
+ * @param maybePartitionSpec An [[FSBasedRelation]] can be created with an
optional
+ * [[PartitionSpec]], so that partition discovery can be skipped.
+ */
+@Experimental
+abstract class FSBasedRelation private[sql](
+ val paths: Array[String],
+ maybePartitionSpec: Option[PartitionSpec])
+ extends BaseRelation {
+
+ /**
+ * Constructs an [[FSBasedRelation]].
+ *
+ * @param paths Base paths of this relation. For partitioned relations,
it should be either root
+ * directories of all partition directories.
+ * @param partitionColumns Partition columns of this relation.
+ */
+ def this(paths: Array[String], partitionColumns: StructType) =
+ this(paths, {
+ if (partitionColumns.isEmpty) None
+ else Some(PartitionSpec(partitionColumns, Array.empty[Partition]))
+ })
+
+ /**
+ * Constructs an [[FSBasedRelation]].
+ *
+ * @param paths Base paths of this relation. For partitioned relations,
it should be either root
+ * directories of all partition directories.
+ */
+ def this(paths: Array[String]) = this(paths, None)
+
+ private val hadoopConf = new
Configuration(sqlContext.sparkContext.hadoopConfiguration)
--- End diff --
As discussed offline, this can be addressed in a separate PR. A possible
approach can be:
1. Have a `hadoopConf` field in `SQLContext`
2. Override `SQLContext.hadoopConf` in `HiveContext` with a `HiveConf`
3. When calling `SQLContext.setConf`, always set both `hadoopConf` and
`conf: SQLConf`
Note that multiple session support need to be considered here.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]