Github user tedyu commented on a diff in the pull request:
https://github.com/apache/spark/pull/12361#discussion_r60339109
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
---
@@ -0,0 +1,530 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import scala.collection.mutable
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+
+import org.apache.spark.SparkContext
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters,
InternalRow}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.FileRelation
+import org.apache.spark.sql.sources.{BaseRelation, Filter}
+import org.apache.spark.sql.types.{StringType, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * ::Experimental::
+ * A factory that produces [[OutputWriter]]s. A new
[[OutputWriterFactory]] is created on driver
+ * side for each write job issued when writing to a [[HadoopFsRelation]],
and then gets serialized
+ * to executor side to create actual [[OutputWriter]]s on the fly.
+ *
+ * @since 1.4.0
+ */
+@Experimental
+abstract class OutputWriterFactory extends Serializable {
+ /**
+ * When writing to a [[HadoopFsRelation]], this method gets called by
each task on executor side
+ * to instantiate new [[OutputWriter]]s.
+ *
+ * @param path Path of the file to which this [[OutputWriter]] is
supposed to write. Note that
+ * this may not point to the final output file. For example,
`FileOutputFormat` writes to
+ * temporary directories and then merge written files back to the
final destination. In
+ * this case, `path` points to a temporary output file under the
temporary directory.
+ * @param dataSchema Schema of the rows to be written. Partition columns
are not included in the
+ * schema if the relation being written is partitioned.
+ * @param context The Hadoop MapReduce task context.
+ * @since 1.4.0
+ */
+ private[sql] def newInstance(
+ path: String,
+ bucketId: Option[Int], // TODO: This doesn't belong here...
+ dataSchema: StructType,
+ context: TaskAttemptContext): OutputWriter
+}
+
+/**
+ * ::Experimental::
+ * [[OutputWriter]] is used together with [[HadoopFsRelation]] for
persisting rows to the
+ * underlying file system. Subclasses of [[OutputWriter]] must provide a
zero-argument constructor.
+ * An [[OutputWriter]] instance is created and initialized when a new
output file is opened on
+ * executor side. This instance is used to persist rows to this single
output file.
+ *
+ * @since 1.4.0
+ */
+@Experimental
+abstract class OutputWriter {
+ /**
+ * Persists a single row. Invoked on the executor side. When writing
to dynamically partitioned
+ * tables, dynamic partition columns are not included in rows to be
written.
+ *
+ * @since 1.4.0
+ */
+ def write(row: Row): Unit
+
+ /**
+ * Closes the [[OutputWriter]]. Invoked on the executor side after all
rows are persisted, before
+ * the task output is committed.
+ *
+ * @since 1.4.0
+ */
+ def close(): Unit
+
+ private var converter: InternalRow => Row = _
+
+ protected[sql] def initConverter(dataSchema: StructType) = {
+ converter =
+
CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow
=> Row]
+ }
+
+ protected[sql] def writeInternal(row: InternalRow): Unit = {
+ write(converter(row))
+ }
+}
+
+/**
+ * Acts as a container for all of the metadata required to read from a
datasource. All discovery,
+ * resolution and merging logic for schemas and partitions has been
removed.
+ *
+ * @param location A [[FileCatalog]] that can enumerate the locations of
all the files that comprise
+ * this relation.
+ * @param partitionSchema The schema of the columns (if any) that are used
to partition the relation
+ * @param dataSchema The schema of any remaining columns. Note that if
any partition columns are
+ * present in the actual data files as well, they are
preserved.
+ * @param bucketSpec Describes the bucketing (hash-partitioning of the
files by some column values).
+ * @param fileFormat A file format that can be used to read and write the
data in files.
+ * @param options Configuration used when reading / writing data.
+ */
+case class HadoopFsRelation(
+ sqlContext: SQLContext,
+ location: FileCatalog,
+ partitionSchema: StructType,
+ dataSchema: StructType,
+ bucketSpec: Option[BucketSpec],
+ fileFormat: FileFormat,
+ options: Map[String, String]) extends BaseRelation with FileRelation {
+
+ val schema: StructType = {
+ val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet
+ StructType(dataSchema ++ partitionSchema.filterNot { column =>
+ dataSchemaColumnNames.contains(column.name.toLowerCase)
+ })
+ }
+
+ def partitionSchemaOption: Option[StructType] =
+ if (partitionSchema.isEmpty) None else Some(partitionSchema)
+ def partitionSpec: PartitionSpec = location.partitionSpec()
+
+ def refresh(): Unit = location.refresh()
+
+ override def toString: String =
+ s"HadoopFiles"
+
+ /** Returns the list of files that will be read when scanning this
relation. */
+ override def inputFiles: Array[String] =
+ location.allFiles().map(_.getPath.toUri.toString).toArray
+
+ override def sizeInBytes: Long = location.allFiles().map(_.getLen).sum
+}
+
+/**
+ * Used to read and write data stored in files to/from the [[InternalRow]]
format.
+ */
+trait FileFormat {
+ /**
+ * When possible, this method should return the schema of the given
`files`. When the format
+ * does not support inference, or no valid files are given should return
None. In these cases
+ * Spark will require that user specify the schema manually.
+ */
+ def inferSchema(
+ sqlContext: SQLContext,
+ options: Map[String, String],
+ files: Seq[FileStatus]): Option[StructType]
+
+ /**
+ * Prepares a read job and returns a potentially updated data source
option [[Map]]. This method
+ * can be useful for collecting necessary global information for
scanning input data.
+ */
+ def prepareRead(
+ sqlContext: SQLContext,
+ options: Map[String, String],
+ files: Seq[FileStatus]): Map[String, String] = options
+
+ /**
+ * Prepares a write job and returns an [[OutputWriterFactory]]. Client
side job preparation can
+ * be put here. For example, user defined output committer can be
configured here
+ * by setting the output committer class in the conf of
spark.sql.sources.outputCommitterClass.
+ */
+ def prepareWrite(
+ sqlContext: SQLContext,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory
+
+ /**
+ * Returns whether this format support returning columnar batch or not.
+ *
+ * TODO: we should just have different traits for the different formats.
+ */
+ def supportBatch(sqlContext: SQLContext, dataSchema: StructType):
Boolean = {
+ false
+ }
+
+ /**
+ * Returns a function that can be used to read a single file in as an
Iterator of InternalRow.
+ *
+ * @param dataSchema The global data schema. It can be either specified
by the user, or
+ * reconciled/merged from all underlying data files.
If any partition columns
+ * are contained in the files, they are preserved in
this schema.
+ * @param partitionSchema The schema of the partition column row that
will be present in each
+ * PartitionedFile. These columns should be
appended to the rows that
+ * are produced by the iterator.
+ * @param requiredSchema The schema of the data that should be output
for each row. This may be a
+ * subset of the columns that are present in the
file if column pruning has
+ * occurred.
+ * @param filters A set of filters than can optionally be used to reduce
the number of rows output
+ * @param options A set of string -> string configuration options.
+ * @return
+ */
+ def buildReader(
+ sqlContext: SQLContext,
--- End diff --
Do we need @param for this ?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]