Github user marmbrus commented on a diff in the pull request:
https://github.com/apache/spark/pull/15710#discussion_r86056877
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala
---
@@ -17,125 +17,13 @@
package org.apache.spark.sql.execution.datasources.parquet
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.parquet.hadoop.{ParquetOutputFormat, ParquetRecordWriter}
-import org.apache.parquet.hadoop.codec.CodecConfig
-import org.apache.parquet.hadoop.util.ContextUtil
+import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.{OutputWriter,
OutputWriterFactory}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.SerializableConfiguration
-
-
-/**
- * A factory for generating OutputWriters for writing parquet files. This
implemented is different
- * from the [[ParquetOutputWriter]] as this does not use any
[[OutputCommitter]]. It simply
- * writes the data to the path used to generate the output writer. Callers
of this factory
- * has to ensure which files are to be considered as committed.
- */
-private[parquet] class ParquetOutputWriterFactory(
- sqlConf: SQLConf,
- dataSchema: StructType,
- hadoopConf: Configuration,
- options: Map[String, String])
- extends OutputWriterFactory {
-
- private val serializableConf: SerializableConfiguration = {
- val job = Job.getInstance(hadoopConf)
- val conf = ContextUtil.getConfiguration(job)
- val parquetOptions = new ParquetOptions(options, sqlConf)
-
- // We're not really using `ParquetOutputFormat[Row]` for writing data
here, because we override
- // it in `ParquetOutputWriter` to support appending and dynamic
partitioning. The reason why
- // we set it here is to setup the output committer class to
`ParquetOutputCommitter`, which is
- // bundled with `ParquetOutputFormat[Row]`.
- job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])
-
- ParquetOutputFormat.setWriteSupportClass(job,
classOf[ParquetWriteSupport])
-
- // We want to clear this temporary metadata from saving into Parquet
file.
- // This metadata is only useful for detecting optional columns when
pushing down filters.
- val dataSchemaToWrite = StructType.removeMetadata(
- StructType.metadataKeyForOptionalField,
- dataSchema).asInstanceOf[StructType]
- ParquetWriteSupport.setSchema(dataSchemaToWrite, conf)
-
- // Sets flags for `CatalystSchemaConverter` (which converts Catalyst
schema to Parquet schema)
- // and `CatalystWriteSupport` (writing actual rows to Parquet files).
- conf.set(
- SQLConf.PARQUET_BINARY_AS_STRING.key,
- sqlConf.isParquetBinaryAsString.toString)
-
- conf.set(
- SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
- sqlConf.isParquetINT96AsTimestamp.toString)
-
- conf.set(
- SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
- sqlConf.writeLegacyParquetFormat.toString)
-
- // Sets compression scheme
- conf.set(ParquetOutputFormat.COMPRESSION,
parquetOptions.compressionCodecClassName)
- new SerializableConfiguration(conf)
- }
-
- /**
- * Returns a [[OutputWriter]] that writes data to the give path without
using
- * [[OutputCommitter]].
- */
- override def newWriter(path: String): OutputWriter = new OutputWriter {
-
- // Create TaskAttemptContext that is used to pass on Configuration to
the ParquetRecordWriter
- private val hadoopTaskAttemptId = new TaskAttemptID(new TaskID(new
JobID, TaskType.MAP, 0), 0)
- private val hadoopAttemptContext = new TaskAttemptContextImpl(
- serializableConf.value, hadoopTaskAttemptId)
-
- // Instance of ParquetRecordWriter that does not use OutputCommitter
- private val recordWriter = createNoCommitterRecordWriter(path,
hadoopAttemptContext)
-
- override def write(row: Row): Unit = {
- throw new UnsupportedOperationException("call writeInternal")
- }
-
- protected[sql] override def writeInternal(row: InternalRow): Unit = {
- recordWriter.write(null, row)
- }
-
- override def close(): Unit = recordWriter.close(hadoopAttemptContext)
- }
-
- /** Create a [[ParquetRecordWriter]] that writes the given path without
using OutputCommitter */
- private def createNoCommitterRecordWriter(
- path: String,
- hadoopAttemptContext: TaskAttemptContext): RecordWriter[Void,
InternalRow] = {
- // Custom ParquetOutputFormat that disable use of committer and writes
to the given path
- val outputFormat = new ParquetOutputFormat[InternalRow]() {
- override def getOutputCommitter(c: TaskAttemptContext):
OutputCommitter = { null }
- override def getDefaultWorkFile(c: TaskAttemptContext, ext: String):
Path = { new Path(path) }
- }
- outputFormat.getRecordWriter(hadoopAttemptContext)
- }
-
- /** Disable the use of the older API. */
- override def newInstance(
- path: String,
- dataSchema: StructType,
- context: TaskAttemptContext): OutputWriter = {
- throw new UnsupportedOperationException("this version of newInstance
not supported for " +
- "ParquetOutputWriterFactory")
- }
-
- override def getFileExtension(context: TaskAttemptContext): String = {
- CodecConfig.from(context).getCodec.getExtension + ".parquet"
- }
-}
-
+import org.apache.spark.sql.execution.datasources.OutputWriter
--- End diff --
Why is this down here?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]