Github user dongjoon-hyun commented on a diff in the pull request:
https://github.com/apache/spark/pull/19651#discussion_r149310995
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
---
@@ -39,3 +58,134 @@ private[sql] object OrcFileFormat {
names.foreach(checkFieldName)
}
}
+
+class DefaultSource extends OrcFileFormat
+
+/**
+ * New ORC File Format based on Apache ORC 1.4.1 and above.
+ */
+class OrcFileFormat
+ extends FileFormat
+ with DataSourceRegister
+ with Serializable {
+
+ override def shortName(): String = "orc"
+
+ override def toString: String = "ORC_1.4"
+
+ override def hashCode(): Int = getClass.hashCode()
+
+ override def equals(other: Any): Boolean =
other.isInstanceOf[OrcFileFormat]
+
+ override def inferSchema(
+ sparkSession: SparkSession,
+ options: Map[String, String],
+ files: Seq[FileStatus]): Option[StructType] = {
+ OrcUtils.readSchema(sparkSession, files)
+ }
+
+ override def prepareWrite(
+ sparkSession: SparkSession,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = {
+ val orcOptions = new OrcOptions(options,
sparkSession.sessionState.conf)
+
+ val conf = job.getConfiguration
+
+ conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute,
OrcUtils.getSchemaString(dataSchema))
+
+ conf.set(COMPRESS.getAttribute, orcOptions.compressionCodec)
+
+ conf.asInstanceOf[JobConf]
+
.setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]])
+
+ new OutputWriterFactory {
+ override def newInstance(
+ path: String,
+ dataSchema: StructType,
+ context: TaskAttemptContext): OutputWriter = {
+ new OrcOutputWriter(path, dataSchema, context)
+ }
+
+ override def getFileExtension(context: TaskAttemptContext): String =
{
+ val compressionExtension: String = {
+ val name = context.getConfiguration.get(COMPRESS.getAttribute)
+ OrcOptions.extensionsForCompressionCodecNames.getOrElse(name, "")
+ }
+
+ compressionExtension + ".orc"
+ }
+ }
+ }
+
+ override def isSplitable(
+ sparkSession: SparkSession,
+ options: Map[String, String],
+ path: Path): Boolean = {
+ true
+ }
+
+ override def buildReaderWithPartitionValues(
--- End diff --
Yep. I see. It was because I preferred to be consistent with
`ParquetFileFormat` here.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]