Repository: spark
Updated Branches:
  refs/heads/master 8543996c3 -> 6c29b3de7


[SPARK-17925][SQL] Break fileSourceInterfaces.scala into multiple pieces

## What changes were proposed in this pull request?
This patch does a few changes to the file structure of data sources:

- Break fileSourceInterfaces.scala into multiple pieces (HadoopFsRelation, 
FileFormat, OutputWriter)
- Move ParquetOutputWriter into its own file

I created this as a separate patch so it'd be easier to review my future PRs 
that focus on refactoring this internal logic. This patch only moves code 
around, and has no logic changes.

## How was this patch tested?
N/A - should be covered by existing tests.

Author: Reynold Xin <r...@databricks.com>

Closes #15473 from rxin/SPARK-17925.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c29b3de
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c29b3de
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c29b3de

Branch: refs/heads/master
Commit: 6c29b3de763115d8676ed91f896e75c490e8c5b2
Parents: 8543996
Author: Reynold Xin <r...@databricks.com>
Authored: Fri Oct 14 14:14:52 2016 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Fri Oct 14 14:14:52 2016 +0800

----------------------------------------------------------------------
 .../sql/execution/datasources/FileFormat.scala  | 212 +++++++++++
 .../datasources/HadoopFsRelation.scala          |  77 ++++
 .../execution/datasources/OutputWriter.scala    | 101 ++++++
 .../datasources/fileSourceInterfaces.scala      | 349 -------------------
 .../datasources/parquet/ParquetFileFormat.scala | 144 --------
 .../parquet/ParquetOutputWriter.scala           | 178 ++++++++++
 6 files changed, 568 insertions(+), 493 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6c29b3de/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
new file mode 100644
index 0000000..bde2d2b
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.io.compress.{CompressionCodecFactory, 
SplittableCompressionCodec}
+import org.apache.hadoop.mapreduce.Job
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Used to read and write data stored in files to/from the [[InternalRow]] 
format.
+ */
+trait FileFormat {
+  /**
+   * When possible, this method should return the schema of the given `files`. 
 When the format
+   * does not support inference, or no valid files are given should return 
None.  In these cases
+   * Spark will require that user specify the schema manually.
+   */
+  def inferSchema(
+      sparkSession: SparkSession,
+      options: Map[String, String],
+      files: Seq[FileStatus]): Option[StructType]
+
+  /**
+   * Prepares a write job and returns an [[OutputWriterFactory]].  Client side 
job preparation can
+   * be put here.  For example, user defined output committer can be 
configured here
+   * by setting the output committer class in the conf of 
spark.sql.sources.outputCommitterClass.
+   */
+  def prepareWrite(
+      sparkSession: SparkSession,
+      job: Job,
+      options: Map[String, String],
+      dataSchema: StructType): OutputWriterFactory
+
+  /**
+   * Returns a [[OutputWriterFactory]] for generating output writers that can 
write data.
+   * This method is current used only by FileStreamSinkWriter to generate 
output writers that
+   * does not use output committers to write data. The OutputWriter generated 
by the returned
+   * [[OutputWriterFactory]] must implement the method `newWriter(path)`..
+   */
+  def buildWriter(
+      sqlContext: SQLContext,
+      dataSchema: StructType,
+      options: Map[String, String]): OutputWriterFactory = {
+    // TODO: Remove this default implementation when the other formats have 
been ported
+    throw new UnsupportedOperationException(s"buildWriter is not supported for 
$this")
+  }
+
+  /**
+   * Returns whether this format support returning columnar batch or not.
+   *
+   * TODO: we should just have different traits for the different formats.
+   */
+  def supportBatch(sparkSession: SparkSession, dataSchema: StructType): 
Boolean = {
+    false
+  }
+
+  /**
+   * Returns whether a file with `path` could be splitted or not.
+   */
+  def isSplitable(
+      sparkSession: SparkSession,
+      options: Map[String, String],
+      path: Path): Boolean = {
+    false
+  }
+
+  /**
+   * Returns a function that can be used to read a single file in as an 
Iterator of InternalRow.
+   *
+   * @param dataSchema The global data schema. It can be either specified by 
the user, or
+   *                   reconciled/merged from all underlying data files. If 
any partition columns
+   *                   are contained in the files, they are preserved in this 
schema.
+   * @param partitionSchema The schema of the partition column row that will 
be present in each
+   *                        PartitionedFile. These columns should be appended 
to the rows that
+   *                        are produced by the iterator.
+   * @param requiredSchema The schema of the data that should be output for 
each row.  This may be a
+   *                       subset of the columns that are present in the file 
if column pruning has
+   *                       occurred.
+   * @param filters A set of filters than can optionally be used to reduce the 
number of rows output
+   * @param options A set of string -> string configuration options.
+   * @return
+   */
+  def buildReader(
+      sparkSession: SparkSession,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      requiredSchema: StructType,
+      filters: Seq[Filter],
+      options: Map[String, String],
+      hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+    // TODO: Remove this default implementation when the other formats have 
been ported
+    // Until then we guard in [[FileSourceStrategy]] to only call this method 
on supported formats.
+    throw new UnsupportedOperationException(s"buildReader is not supported for 
$this")
+  }
+
+  /**
+   * Exactly the same as [[buildReader]] except that the reader function 
returned by this method
+   * appends partition values to [[InternalRow]]s produced by the reader 
function [[buildReader]]
+   * returns.
+   */
+  def buildReaderWithPartitionValues(
+      sparkSession: SparkSession,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      requiredSchema: StructType,
+      filters: Seq[Filter],
+      options: Map[String, String],
+      hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+    val dataReader = buildReader(
+      sparkSession, dataSchema, partitionSchema, requiredSchema, filters, 
options, hadoopConf)
+
+    new (PartitionedFile => Iterator[InternalRow]) with Serializable {
+      private val fullSchema = requiredSchema.toAttributes ++ 
partitionSchema.toAttributes
+
+      private val joinedRow = new JoinedRow()
+
+      // Using lazy val to avoid serialization
+      private lazy val appendPartitionColumns =
+        GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+
+      override def apply(file: PartitionedFile): Iterator[InternalRow] = {
+        // Using local val to avoid per-row lazy val check (pre-mature 
optimization?...)
+        val converter = appendPartitionColumns
+
+        // Note that we have to apply the converter even though 
`file.partitionValues` is empty.
+        // This is because the converter is also responsible for converting 
safe `InternalRow`s into
+        // `UnsafeRow`s.
+        dataReader(file).map { dataRow =>
+          converter(joinedRow(dataRow, file.partitionValues))
+        }
+      }
+    }
+  }
+
+}
+
+/**
+ * The base class file format that is based on text file.
+ */
+abstract class TextBasedFileFormat extends FileFormat {
+  private var codecFactory: CompressionCodecFactory = null
+  override def isSplitable(
+      sparkSession: SparkSession,
+      options: Map[String, String],
+      path: Path): Boolean = {
+    if (codecFactory == null) {
+      codecFactory = new CompressionCodecFactory(
+        sparkSession.sessionState.newHadoopConfWithOptions(options))
+    }
+    val codec = codecFactory.getCodec(path)
+    codec == null || codec.isInstanceOf[SplittableCompressionCodec]
+  }
+}
+
+/**
+ * A collection of data files from a partitioned relation, along with the 
partition values in the
+ * form of an [[InternalRow]].
+ */
+case class Partition(values: InternalRow, files: Seq[FileStatus])
+
+/**
+ * An interface for objects capable of enumerating the files that comprise a 
relation as well
+ * as the partitioning characteristics of those files.
+ */
+trait FileCatalog {
+
+  /** Returns the list of input paths from which the catalog will get files. */
+  def paths: Seq[Path]
+
+  /** Returns the specification of the partitions inferred from the data. */
+  def partitionSpec(): PartitionSpec
+
+  /**
+   * Returns all valid files grouped into partitions when the data is 
partitioned. If the data is
+   * unpartitioned, this will return a single partition with no partition 
values.
+   *
+   * @param filters The filters used to prune which partitions are returned.  
These filters must
+   *                only refer to partition columns and this method will only 
return files
+   *                where these predicates are guaranteed to evaluate to 
`true`.  Thus, these
+   *                filters will not need to be evaluated again on the 
returned data.
+   */
+  def listFiles(filters: Seq[Expression]): Seq[Partition]
+
+  /** Returns all the valid files. */
+  def allFiles(): Seq[FileStatus]
+
+  /** Refresh the file listing */
+  def refresh(): Unit
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6c29b3de/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
new file mode 100644
index 0000000..c7ebe0b
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.{SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.execution.FileRelation
+import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister}
+import org.apache.spark.sql.types.StructType
+
+
+/**
+ * Acts as a container for all of the metadata required to read from a 
datasource. All discovery,
+ * resolution and merging logic for schemas and partitions has been removed.
+ *
+ * @param location A [[FileCatalog]] that can enumerate the locations of all 
the files that comprise
+ *                 this relation.
+ * @param partitionSchema The schema of the columns (if any) that are used to 
partition the relation
+ * @param dataSchema The schema of any remaining columns.  Note that if any 
partition columns are
+ *                   present in the actual data files as well, they are 
preserved.
+ * @param bucketSpec Describes the bucketing (hash-partitioning of the files 
by some column values).
+ * @param fileFormat A file format that can be used to read and write the data 
in files.
+ * @param options Configuration used when reading / writing data.
+ */
+case class HadoopFsRelation(
+    location: FileCatalog,
+    partitionSchema: StructType,
+    dataSchema: StructType,
+    bucketSpec: Option[BucketSpec],
+    fileFormat: FileFormat,
+    options: Map[String, String])(val sparkSession: SparkSession)
+  extends BaseRelation with FileRelation {
+
+  override def sqlContext: SQLContext = sparkSession.sqlContext
+
+  val schema: StructType = {
+    val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet
+    StructType(dataSchema ++ partitionSchema.filterNot { column =>
+      dataSchemaColumnNames.contains(column.name.toLowerCase)
+    })
+  }
+
+  def partitionSchemaOption: Option[StructType] =
+    if (partitionSchema.isEmpty) None else Some(partitionSchema)
+
+  def partitionSpec: PartitionSpec = location.partitionSpec()
+
+  def refresh(): Unit = location.refresh()
+
+  override def toString: String = {
+    fileFormat match {
+      case source: DataSourceRegister => source.shortName()
+      case _ => "HadoopFiles"
+    }
+  }
+
+  /** Returns the list of files that will be read when scanning this relation. 
*/
+  override def inputFiles: Array[String] =
+    location.allFiles().map(_.getPath.toUri.toString).toArray
+
+  override def sizeInBytes: Long = location.allFiles().map(_.getLen).sum
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6c29b3de/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala
new file mode 100644
index 0000000..d2eec7b
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.types.StructType
+
+
+/**
+ * A factory that produces [[OutputWriter]]s.  A new [[OutputWriterFactory]] 
is created on driver
+ * side for each write job issued when writing to a [[HadoopFsRelation]], and 
then gets serialized
+ * to executor side to create actual [[OutputWriter]]s on the fly.
+ */
+abstract class OutputWriterFactory extends Serializable {
+  /**
+   * When writing to a [[HadoopFsRelation]], this method gets called by each 
task on executor side
+   * to instantiate new [[OutputWriter]]s.
+   *
+   * @param path Path of the file to which this [[OutputWriter]] is supposed 
to write.  Note that
+   *        this may not point to the final output file.  For example, 
`FileOutputFormat` writes to
+   *        temporary directories and then merge written files back to the 
final destination.  In
+   *        this case, `path` points to a temporary output file under the 
temporary directory.
+   * @param dataSchema Schema of the rows to be written. Partition columns are 
not included in the
+   *        schema if the relation being written is partitioned.
+   * @param context The Hadoop MapReduce task context.
+   * @since 1.4.0
+   */
+  def newInstance(
+      path: String,
+      bucketId: Option[Int], // TODO: This doesn't belong here...
+      dataSchema: StructType,
+      context: TaskAttemptContext): OutputWriter
+
+  /**
+   * Returns a new instance of [[OutputWriter]] that will write data to the 
given path.
+   * This method gets called by each task on executor to write InternalRows to
+   * format-specific files. Compared to the other `newInstance()`, this is a 
newer API that
+   * passes only the path that the writer must write to. The writer must write 
to the exact path
+   * and not modify it (do not add subdirectories, extensions, etc.). All other
+   * file-format-specific information needed to create the writer must be 
passed
+   * through the [[OutputWriterFactory]] implementation.
+   * @since 2.0.0
+   */
+  def newWriter(path: String): OutputWriter = {
+    throw new UnsupportedOperationException("newInstance with just path not 
supported")
+  }
+}
+
+
+/**
+ * [[OutputWriter]] is used together with [[HadoopFsRelation]] for persisting 
rows to the
+ * underlying file system.  Subclasses of [[OutputWriter]] must provide a 
zero-argument constructor.
+ * An [[OutputWriter]] instance is created and initialized when a new output 
file is opened on
+ * executor side.  This instance is used to persist rows to this single output 
file.
+ */
+abstract class OutputWriter {
+  /**
+   * Persists a single row.  Invoked on the executor side.  When writing to 
dynamically partitioned
+   * tables, dynamic partition columns are not included in rows to be written.
+   *
+   * @since 1.4.0
+   */
+  def write(row: Row): Unit
+
+  /**
+   * Closes the [[OutputWriter]]. Invoked on the executor side after all rows 
are persisted, before
+   * the task output is committed.
+   *
+   * @since 1.4.0
+   */
+  def close(): Unit
+
+  private var converter: InternalRow => Row = _
+
+  protected[sql] def initConverter(dataSchema: StructType) = {
+    converter =
+      
CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow
 => Row]
+  }
+
+  protected[sql] def writeInternal(row: InternalRow): Unit = {
+    write(converter(row))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6c29b3de/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
deleted file mode 100644
index 69dd622..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ /dev/null
@@ -1,349 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs._
-import org.apache.hadoop.io.compress.{CompressionCodecFactory, 
SplittableCompressionCodec}
-import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
-
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
-import org.apache.spark.sql.catalyst.expressions._
-import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
-import org.apache.spark.sql.execution.FileRelation
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, Filter}
-import org.apache.spark.sql.types.StructType
-
-/**
- * ::Experimental::
- * A factory that produces [[OutputWriter]]s.  A new [[OutputWriterFactory]] 
is created on driver
- * side for each write job issued when writing to a [[HadoopFsRelation]], and 
then gets serialized
- * to executor side to create actual [[OutputWriter]]s on the fly.
- *
- * @since 1.4.0
- */
-@Experimental
-abstract class OutputWriterFactory extends Serializable {
-  /**
-   * When writing to a [[HadoopFsRelation]], this method gets called by each 
task on executor side
-   * to instantiate new [[OutputWriter]]s.
-   *
-   * @param path Path of the file to which this [[OutputWriter]] is supposed 
to write.  Note that
-   *        this may not point to the final output file.  For example, 
`FileOutputFormat` writes to
-   *        temporary directories and then merge written files back to the 
final destination.  In
-   *        this case, `path` points to a temporary output file under the 
temporary directory.
-   * @param dataSchema Schema of the rows to be written. Partition columns are 
not included in the
-   *        schema if the relation being written is partitioned.
-   * @param context The Hadoop MapReduce task context.
-   * @since 1.4.0
-   */
-  def newInstance(
-      path: String,
-      bucketId: Option[Int], // TODO: This doesn't belong here...
-      dataSchema: StructType,
-      context: TaskAttemptContext): OutputWriter
-
-  /**
-   * Returns a new instance of [[OutputWriter]] that will write data to the 
given path.
-   * This method gets called by each task on executor to write 
[[InternalRow]]s to
-   * format-specific files. Compared to the other `newInstance()`, this is a 
newer API that
-   * passes only the path that the writer must write to. The writer must write 
to the exact path
-   * and not modify it (do not add subdirectories, extensions, etc.). All other
-   * file-format-specific information needed to create the writer must be 
passed
-   * through the [[OutputWriterFactory]] implementation.
-   * @since 2.0.0
-   */
-  def newWriter(path: String): OutputWriter = {
-    throw new UnsupportedOperationException("newInstance with just path not 
supported")
-  }
-}
-
-/**
- * ::Experimental::
- * [[OutputWriter]] is used together with [[HadoopFsRelation]] for persisting 
rows to the
- * underlying file system.  Subclasses of [[OutputWriter]] must provide a 
zero-argument constructor.
- * An [[OutputWriter]] instance is created and initialized when a new output 
file is opened on
- * executor side.  This instance is used to persist rows to this single output 
file.
- *
- * @since 1.4.0
- */
-@Experimental
-abstract class OutputWriter {
-  /**
-   * Persists a single row.  Invoked on the executor side.  When writing to 
dynamically partitioned
-   * tables, dynamic partition columns are not included in rows to be written.
-   *
-   * @since 1.4.0
-   */
-  def write(row: Row): Unit
-
-  /**
-   * Closes the [[OutputWriter]]. Invoked on the executor side after all rows 
are persisted, before
-   * the task output is committed.
-   *
-   * @since 1.4.0
-   */
-  def close(): Unit
-
-  private var converter: InternalRow => Row = _
-
-  protected[sql] def initConverter(dataSchema: StructType) = {
-    converter =
-      
CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow
 => Row]
-  }
-
-  protected[sql] def writeInternal(row: InternalRow): Unit = {
-    write(converter(row))
-  }
-}
-
-/**
- * Acts as a container for all of the metadata required to read from a 
datasource. All discovery,
- * resolution and merging logic for schemas and partitions has been removed.
- *
- * @param location A [[FileCatalog]] that can enumerate the locations of all 
the files that comprise
- *                 this relation.
- * @param partitionSchema The schema of the columns (if any) that are used to 
partition the relation
- * @param dataSchema The schema of any remaining columns.  Note that if any 
partition columns are
- *                   present in the actual data files as well, they are 
preserved.
- * @param bucketSpec Describes the bucketing (hash-partitioning of the files 
by some column values).
- * @param fileFormat A file format that can be used to read and write the data 
in files.
- * @param options Configuration used when reading / writing data.
- */
-case class HadoopFsRelation(
-    location: FileCatalog,
-    partitionSchema: StructType,
-    dataSchema: StructType,
-    bucketSpec: Option[BucketSpec],
-    fileFormat: FileFormat,
-    options: Map[String, String])(val sparkSession: SparkSession)
-  extends BaseRelation with FileRelation {
-
-  override def sqlContext: SQLContext = sparkSession.sqlContext
-
-  val schema: StructType = {
-    val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet
-    StructType(dataSchema ++ partitionSchema.filterNot { column =>
-      dataSchemaColumnNames.contains(column.name.toLowerCase)
-    })
-  }
-
-  def partitionSchemaOption: Option[StructType] =
-    if (partitionSchema.isEmpty) None else Some(partitionSchema)
-  def partitionSpec: PartitionSpec = location.partitionSpec()
-
-  def refresh(): Unit = location.refresh()
-
-  override def toString: String = {
-    fileFormat match {
-      case source: DataSourceRegister => source.shortName()
-      case _ => "HadoopFiles"
-    }
-  }
-
-  /** Returns the list of files that will be read when scanning this relation. 
*/
-  override def inputFiles: Array[String] =
-    location.allFiles().map(_.getPath.toUri.toString).toArray
-
-  override def sizeInBytes: Long = location.allFiles().map(_.getLen).sum
-}
-
-/**
- * Used to read and write data stored in files to/from the [[InternalRow]] 
format.
- */
-trait FileFormat {
-  /**
-   * When possible, this method should return the schema of the given `files`. 
 When the format
-   * does not support inference, or no valid files are given should return 
None.  In these cases
-   * Spark will require that user specify the schema manually.
-   */
-  def inferSchema(
-      sparkSession: SparkSession,
-      options: Map[String, String],
-      files: Seq[FileStatus]): Option[StructType]
-
-  /**
-   * Prepares a write job and returns an [[OutputWriterFactory]].  Client side 
job preparation can
-   * be put here.  For example, user defined output committer can be 
configured here
-   * by setting the output committer class in the conf of 
spark.sql.sources.outputCommitterClass.
-   */
-  def prepareWrite(
-      sparkSession: SparkSession,
-      job: Job,
-      options: Map[String, String],
-      dataSchema: StructType): OutputWriterFactory
-
-  /**
-   * Returns a [[OutputWriterFactory]] for generating output writers that can 
write data.
-   * This method is current used only by FileStreamSinkWriter to generate 
output writers that
-   * does not use output committers to write data. The OutputWriter generated 
by the returned
-   * [[OutputWriterFactory]] must implement the method `newWriter(path)`..
-   */
-  def buildWriter(
-      sqlContext: SQLContext,
-      dataSchema: StructType,
-      options: Map[String, String]): OutputWriterFactory = {
-    // TODO: Remove this default implementation when the other formats have 
been ported
-    throw new UnsupportedOperationException(s"buildWriter is not supported for 
$this")
-  }
-
-  /**
-   * Returns whether this format support returning columnar batch or not.
-   *
-   * TODO: we should just have different traits for the different formats.
-   */
-  def supportBatch(sparkSession: SparkSession, dataSchema: StructType): 
Boolean = {
-    false
-  }
-
-  /**
-   * Returns whether a file with `path` could be splitted or not.
-   */
-  def isSplitable(
-      sparkSession: SparkSession,
-      options: Map[String, String],
-      path: Path): Boolean = {
-    false
-  }
-
-  /**
-   * Returns a function that can be used to read a single file in as an 
Iterator of InternalRow.
-   *
-   * @param dataSchema The global data schema. It can be either specified by 
the user, or
-   *                   reconciled/merged from all underlying data files. If 
any partition columns
-   *                   are contained in the files, they are preserved in this 
schema.
-   * @param partitionSchema The schema of the partition column row that will 
be present in each
-   *                        PartitionedFile. These columns should be appended 
to the rows that
-   *                        are produced by the iterator.
-   * @param requiredSchema The schema of the data that should be output for 
each row.  This may be a
-   *                       subset of the columns that are present in the file 
if column pruning has
-   *                       occurred.
-   * @param filters A set of filters than can optionally be used to reduce the 
number of rows output
-   * @param options A set of string -> string configuration options.
-   * @return
-   */
-  def buildReader(
-      sparkSession: SparkSession,
-      dataSchema: StructType,
-      partitionSchema: StructType,
-      requiredSchema: StructType,
-      filters: Seq[Filter],
-      options: Map[String, String],
-      hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
-    // TODO: Remove this default implementation when the other formats have 
been ported
-    // Until then we guard in [[FileSourceStrategy]] to only call this method 
on supported formats.
-    throw new UnsupportedOperationException(s"buildReader is not supported for 
$this")
-  }
-
-  /**
-   * Exactly the same as [[buildReader]] except that the reader function 
returned by this method
-   * appends partition values to [[InternalRow]]s produced by the reader 
function [[buildReader]]
-   * returns.
-   */
-  def buildReaderWithPartitionValues(
-      sparkSession: SparkSession,
-      dataSchema: StructType,
-      partitionSchema: StructType,
-      requiredSchema: StructType,
-      filters: Seq[Filter],
-      options: Map[String, String],
-      hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
-    val dataReader = buildReader(
-      sparkSession, dataSchema, partitionSchema, requiredSchema, filters, 
options, hadoopConf)
-
-    new (PartitionedFile => Iterator[InternalRow]) with Serializable {
-      private val fullSchema = requiredSchema.toAttributes ++ 
partitionSchema.toAttributes
-
-      private val joinedRow = new JoinedRow()
-
-      // Using lazy val to avoid serialization
-      private lazy val appendPartitionColumns =
-        GenerateUnsafeProjection.generate(fullSchema, fullSchema)
-
-      override def apply(file: PartitionedFile): Iterator[InternalRow] = {
-        // Using local val to avoid per-row lazy val check (pre-mature 
optimization?...)
-        val converter = appendPartitionColumns
-
-        // Note that we have to apply the converter even though 
`file.partitionValues` is empty.
-        // This is because the converter is also responsible for converting 
safe `InternalRow`s into
-        // `UnsafeRow`s.
-        dataReader(file).map { dataRow =>
-          converter(joinedRow(dataRow, file.partitionValues))
-        }
-      }
-    }
-  }
-
-}
-
-/**
- * The base class file format that is based on text file.
- */
-abstract class TextBasedFileFormat extends FileFormat {
-  private var codecFactory: CompressionCodecFactory = null
-  override def isSplitable(
-      sparkSession: SparkSession,
-      options: Map[String, String],
-      path: Path): Boolean = {
-    if (codecFactory == null) {
-      codecFactory = new CompressionCodecFactory(
-        sparkSession.sessionState.newHadoopConfWithOptions(options))
-    }
-    val codec = codecFactory.getCodec(path)
-    codec == null || codec.isInstanceOf[SplittableCompressionCodec]
-  }
-}
-
-/**
- * A collection of data files from a partitioned relation, along with the 
partition values in the
- * form of an [[InternalRow]].
- */
-case class Partition(values: InternalRow, files: Seq[FileStatus])
-
-/**
- * An interface for objects capable of enumerating the files that comprise a 
relation as well
- * as the partitioning characteristics of those files.
- */
-trait FileCatalog {
-
-  /** Returns the list of input paths from which the catalog will get files. */
-  def paths: Seq[Path]
-
-  /** Returns the specification of the partitions inferred from the data. */
-  def partitionSpec(): PartitionSpec
-
-  /**
-   * Returns all valid files grouped into partitions when the data is 
partitioned. If the data is
-   * unpartitioned, this will return a single partition with no partition 
values.
-   *
-   * @param filters The filters used to prune which partitions are returned.  
These filters must
-   *                only refer to partition columns and this method will only 
return files
-   *                where these predicates are guaranteed to evaluate to 
`true`.  Thus, these
-   *                filters will not need to be evaluated again on the 
returned data.
-   */
-  def listFiles(filters: Seq[Expression]): Seq[Partition]
-
-  /** Returns all the valid files. */
-  def allFiles(): Seq[FileStatus]
-
-  /** Refresh the file listing */
-  def refresh(): Unit
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6c29b3de/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 4a308ff..6faafed 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -425,150 +425,6 @@ class ParquetFileFormat
   }
 }
 
-/**
- * A factory for generating OutputWriters for writing parquet files. This 
implemented is different
- * from the [[ParquetOutputWriter]] as this does not use any 
[[OutputCommitter]]. It simply
- * writes the data to the path used to generate the output writer. Callers of 
this factory
- * has to ensure which files are to be considered as committed.
- */
-private[parquet] class ParquetOutputWriterFactory(
-    sqlConf: SQLConf,
-    dataSchema: StructType,
-    hadoopConf: Configuration,
-    options: Map[String, String]) extends OutputWriterFactory {
-
-  private val serializableConf: SerializableConfiguration = {
-    val job = Job.getInstance(hadoopConf)
-    val conf = ContextUtil.getConfiguration(job)
-    val parquetOptions = new ParquetOptions(options, sqlConf)
-
-    // We're not really using `ParquetOutputFormat[Row]` for writing data 
here, because we override
-    // it in `ParquetOutputWriter` to support appending and dynamic 
partitioning.  The reason why
-    // we set it here is to setup the output committer class to 
`ParquetOutputCommitter`, which is
-    // bundled with `ParquetOutputFormat[Row]`.
-    job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])
-
-    ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])
-
-    // We want to clear this temporary metadata from saving into Parquet file.
-    // This metadata is only useful for detecting optional columns when 
pushing down filters.
-    val dataSchemaToWrite = StructType.removeMetadata(
-      StructType.metadataKeyForOptionalField,
-      dataSchema).asInstanceOf[StructType]
-    ParquetWriteSupport.setSchema(dataSchemaToWrite, conf)
-
-    // Sets flags for `CatalystSchemaConverter` (which converts Catalyst 
schema to Parquet schema)
-    // and `CatalystWriteSupport` (writing actual rows to Parquet files).
-    conf.set(
-      SQLConf.PARQUET_BINARY_AS_STRING.key,
-      sqlConf.isParquetBinaryAsString.toString)
-
-    conf.set(
-      SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
-      sqlConf.isParquetINT96AsTimestamp.toString)
-
-    conf.set(
-      SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
-      sqlConf.writeLegacyParquetFormat.toString)
-
-    // Sets compression scheme
-    conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodec)
-    new SerializableConfiguration(conf)
-  }
-
-  /**
-   * Returns a [[OutputWriter]] that writes data to the give path without using
-   * [[OutputCommitter]].
-   */
-  override def newWriter(path: String): OutputWriter = new OutputWriter {
-
-    // Create TaskAttemptContext that is used to pass on Configuration to the 
ParquetRecordWriter
-    private val hadoopTaskAttemptId = new TaskAttemptID(new TaskID(new JobID, 
TaskType.MAP, 0), 0)
-    private val hadoopAttemptContext = new TaskAttemptContextImpl(
-      serializableConf.value, hadoopTaskAttemptId)
-
-    // Instance of ParquetRecordWriter that does not use OutputCommitter
-    private val recordWriter = createNoCommitterRecordWriter(path, 
hadoopAttemptContext)
-
-    override def write(row: Row): Unit = {
-      throw new UnsupportedOperationException("call writeInternal")
-    }
-
-    protected[sql] override def writeInternal(row: InternalRow): Unit = {
-      recordWriter.write(null, row)
-    }
-
-    override def close(): Unit = recordWriter.close(hadoopAttemptContext)
-  }
-
-  /** Create a [[ParquetRecordWriter]] that writes the given path without 
using OutputCommitter */
-  private def createNoCommitterRecordWriter(
-      path: String,
-      hadoopAttemptContext: TaskAttemptContext): RecordWriter[Void, 
InternalRow] = {
-    // Custom ParquetOutputFormat that disable use of committer and writes to 
the given path
-    val outputFormat = new ParquetOutputFormat[InternalRow]() {
-      override def getOutputCommitter(c: TaskAttemptContext): OutputCommitter 
= { null }
-      override def getDefaultWorkFile(c: TaskAttemptContext, ext: String): 
Path = { new Path(path) }
-    }
-    outputFormat.getRecordWriter(hadoopAttemptContext)
-  }
-
-  /** Disable the use of the older API. */
-  def newInstance(
-      path: String,
-      bucketId: Option[Int],
-      dataSchema: StructType,
-      context: TaskAttemptContext): OutputWriter = {
-    throw new UnsupportedOperationException(
-      "this version of newInstance not supported for " +
-        "ParquetOutputWriterFactory")
-  }
-}
-
-
-// NOTE: This class is instantiated and used on executor side only, no need to 
be serializable.
-private[parquet] class ParquetOutputWriter(
-    path: String,
-    bucketId: Option[Int],
-    context: TaskAttemptContext)
-  extends OutputWriter {
-
-  private val recordWriter: RecordWriter[Void, InternalRow] = {
-    val outputFormat = {
-      new ParquetOutputFormat[InternalRow]() {
-        // Here we override `getDefaultWorkFile` for two reasons:
-        //
-        //  1. To allow appending.  We need to generate unique output file 
names to avoid
-        //     overwriting existing files (either exist before the write job, 
or are just written
-        //     by other tasks within the same write job).
-        //
-        //  2. To allow dynamic partitioning.  Default `getDefaultWorkFile` 
uses
-        //     `FileOutputCommitter.getWorkPath()`, which points to the base 
directory of all
-        //     partitions in the case of dynamic partitioning.
-        override def getDefaultWorkFile(context: TaskAttemptContext, 
extension: String): Path = {
-          val configuration = context.getConfiguration
-          val uniqueWriteJobId = 
configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
-          val taskAttemptId = context.getTaskAttemptID
-          val split = taskAttemptId.getTaskID.getId
-          val bucketString = 
bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
-          // It has the `.parquet` extension at the end because 
(de)compression tools
-          // such as gunzip would not be able to decompress this as the 
compression
-          // is not applied on this whole file but on each "page" in Parquet 
format.
-          new Path(path, 
f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension")
-        }
-      }
-    }
-
-    outputFormat.getRecordWriter(context)
-  }
-
-  override def write(row: Row): Unit = throw new 
UnsupportedOperationException("call writeInternal")
-
-  override def writeInternal(row: InternalRow): Unit = 
recordWriter.write(null, row)
-
-  override def close(): Unit = recordWriter.close(context)
-}
-
 object ParquetFileFormat extends Logging {
   private[parquet] def readSchema(
       footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/6c29b3de/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala
new file mode 100644
index 0000000..f89ce05
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.parquet.hadoop.{ParquetOutputFormat, ParquetRecordWriter}
+import org.apache.parquet.hadoop.util.ContextUtil
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.{BucketingUtils, 
OutputWriter, OutputWriterFactory, WriterContainer}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+
+/**
+ * A factory for generating OutputWriters for writing parquet files. This 
implemented is different
+ * from the [[ParquetOutputWriter]] as this does not use any 
[[OutputCommitter]]. It simply
+ * writes the data to the path used to generate the output writer. Callers of 
this factory
+ * has to ensure which files are to be considered as committed.
+ */
+private[parquet] class ParquetOutputWriterFactory(
+    sqlConf: SQLConf,
+    dataSchema: StructType,
+    hadoopConf: Configuration,
+    options: Map[String, String])
+  extends OutputWriterFactory {
+
+  private val serializableConf: SerializableConfiguration = {
+    val job = Job.getInstance(hadoopConf)
+    val conf = ContextUtil.getConfiguration(job)
+    val parquetOptions = new ParquetOptions(options, sqlConf)
+
+    // We're not really using `ParquetOutputFormat[Row]` for writing data 
here, because we override
+    // it in `ParquetOutputWriter` to support appending and dynamic 
partitioning.  The reason why
+    // we set it here is to setup the output committer class to 
`ParquetOutputCommitter`, which is
+    // bundled with `ParquetOutputFormat[Row]`.
+    job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])
+
+    ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])
+
+    // We want to clear this temporary metadata from saving into Parquet file.
+    // This metadata is only useful for detecting optional columns when 
pushing down filters.
+    val dataSchemaToWrite = StructType.removeMetadata(
+      StructType.metadataKeyForOptionalField,
+      dataSchema).asInstanceOf[StructType]
+    ParquetWriteSupport.setSchema(dataSchemaToWrite, conf)
+
+    // Sets flags for `CatalystSchemaConverter` (which converts Catalyst 
schema to Parquet schema)
+    // and `CatalystWriteSupport` (writing actual rows to Parquet files).
+    conf.set(
+      SQLConf.PARQUET_BINARY_AS_STRING.key,
+      sqlConf.isParquetBinaryAsString.toString)
+
+    conf.set(
+      SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+      sqlConf.isParquetINT96AsTimestamp.toString)
+
+    conf.set(
+      SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
+      sqlConf.writeLegacyParquetFormat.toString)
+
+    // Sets compression scheme
+    conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodec)
+    new SerializableConfiguration(conf)
+  }
+
+  /**
+   * Returns a [[OutputWriter]] that writes data to the give path without using
+   * [[OutputCommitter]].
+   */
+  override def newWriter(path: String): OutputWriter = new OutputWriter {
+
+    // Create TaskAttemptContext that is used to pass on Configuration to the 
ParquetRecordWriter
+    private val hadoopTaskAttemptId = new TaskAttemptID(new TaskID(new JobID, 
TaskType.MAP, 0), 0)
+    private val hadoopAttemptContext = new TaskAttemptContextImpl(
+      serializableConf.value, hadoopTaskAttemptId)
+
+    // Instance of ParquetRecordWriter that does not use OutputCommitter
+    private val recordWriter = createNoCommitterRecordWriter(path, 
hadoopAttemptContext)
+
+    override def write(row: Row): Unit = {
+      throw new UnsupportedOperationException("call writeInternal")
+    }
+
+    protected[sql] override def writeInternal(row: InternalRow): Unit = {
+      recordWriter.write(null, row)
+    }
+
+    override def close(): Unit = recordWriter.close(hadoopAttemptContext)
+  }
+
+  /** Create a [[ParquetRecordWriter]] that writes the given path without 
using OutputCommitter */
+  private def createNoCommitterRecordWriter(
+      path: String,
+      hadoopAttemptContext: TaskAttemptContext): RecordWriter[Void, 
InternalRow] = {
+    // Custom ParquetOutputFormat that disable use of committer and writes to 
the given path
+    val outputFormat = new ParquetOutputFormat[InternalRow]() {
+      override def getOutputCommitter(c: TaskAttemptContext): OutputCommitter 
= { null }
+      override def getDefaultWorkFile(c: TaskAttemptContext, ext: String): 
Path = { new Path(path) }
+    }
+    outputFormat.getRecordWriter(hadoopAttemptContext)
+  }
+
+  /** Disable the use of the older API. */
+  def newInstance(
+      path: String,
+      bucketId: Option[Int],
+      dataSchema: StructType,
+      context: TaskAttemptContext): OutputWriter = {
+    throw new UnsupportedOperationException(
+      "this version of newInstance not supported for " +
+        "ParquetOutputWriterFactory")
+  }
+}
+
+
+// NOTE: This class is instantiated and used on executor side only, no need to 
be serializable.
+private[parquet] class ParquetOutputWriter(
+    path: String,
+    bucketId: Option[Int],
+    context: TaskAttemptContext)
+  extends OutputWriter {
+
+  private val recordWriter: RecordWriter[Void, InternalRow] = {
+    val outputFormat = {
+      new ParquetOutputFormat[InternalRow]() {
+        // Here we override `getDefaultWorkFile` for two reasons:
+        //
+        //  1. To allow appending.  We need to generate unique output file 
names to avoid
+        //     overwriting existing files (either exist before the write job, 
or are just written
+        //     by other tasks within the same write job).
+        //
+        //  2. To allow dynamic partitioning.  Default `getDefaultWorkFile` 
uses
+        //     `FileOutputCommitter.getWorkPath()`, which points to the base 
directory of all
+        //     partitions in the case of dynamic partitioning.
+        override def getDefaultWorkFile(context: TaskAttemptContext, 
extension: String): Path = {
+          val configuration = context.getConfiguration
+          val uniqueWriteJobId = 
configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
+          val taskAttemptId = context.getTaskAttemptID
+          val split = taskAttemptId.getTaskID.getId
+          val bucketString = 
bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
+          // It has the `.parquet` extension at the end because 
(de)compression tools
+          // such as gunzip would not be able to decompress this as the 
compression
+          // is not applied on this whole file but on each "page" in Parquet 
format.
+          new Path(path, 
f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension")
+        }
+      }
+    }
+
+    outputFormat.getRecordWriter(context)
+  }
+
+  override def write(row: Row): Unit = throw new 
UnsupportedOperationException("call writeInternal")
+
+  override def writeInternal(row: InternalRow): Unit = 
recordWriter.write(null, row)
+
+  override def close(): Unit = recordWriter.close(context)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to