Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/19651#discussion_r150200482
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
---
@@ -39,3 +58,120 @@ private[sql] object OrcFileFormat {
names.foreach(checkFieldName)
}
}
+
+class DefaultSource extends OrcFileFormat
+
+/**
+ * New ORC File Format based on Apache ORC 1.4.1 and above.
+ */
+class OrcFileFormat
+ extends FileFormat
+ with DataSourceRegister
+ with Serializable {
+
+ override def shortName(): String = "orc"
+
+ override def toString: String = "ORC_1.4"
+
+ override def hashCode(): Int = getClass.hashCode()
+
+ override def equals(other: Any): Boolean =
other.isInstanceOf[OrcFileFormat]
+
+ override def inferSchema(
+ sparkSession: SparkSession,
+ options: Map[String, String],
+ files: Seq[FileStatus]): Option[StructType] = {
+ OrcUtils.readSchema(sparkSession, files)
+ }
+
+ override def prepareWrite(
+ sparkSession: SparkSession,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = {
+ val orcOptions = new OrcOptions(options,
sparkSession.sessionState.conf)
+
+ val conf = job.getConfiguration
+
+ conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute,
OrcUtils.getSchemaString(dataSchema))
+
+ conf.set(COMPRESS.getAttribute, orcOptions.compressionCodec)
+
+ conf.asInstanceOf[JobConf]
+
.setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]])
+
+ new OutputWriterFactory {
+ override def newInstance(
+ path: String,
+ dataSchema: StructType,
+ context: TaskAttemptContext): OutputWriter = {
+ new OrcOutputWriter(path, dataSchema, context)
+ }
+
+ override def getFileExtension(context: TaskAttemptContext): String =
{
+ val compressionExtension: String = {
+ val name = context.getConfiguration.get(COMPRESS.getAttribute)
+ OrcUtils.extensionsForCompressionCodecNames.getOrElse(name, "")
+ }
+
+ compressionExtension + ".orc"
+ }
+ }
+ }
+
+ override def isSplitable(
+ sparkSession: SparkSession,
+ options: Map[String, String],
+ path: Path): Boolean = {
+ true
+ }
+
+ override def buildReader(
+ sparkSession: SparkSession,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String],
+ hadoopConf: Configuration): (PartitionedFile) =>
Iterator[InternalRow] = {
+ if (sparkSession.sessionState.conf.orcFilterPushDown) {
+ OrcFilters.createFilter(dataSchema, filters).foreach { f =>
+ OrcInputFormat.setSearchArgument(hadoopConf, f,
dataSchema.fieldNames)
+ }
+ }
+
+ val broadcastedConf =
+ sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
+ val resolver = sparkSession.sessionState.conf.resolver
--- End diff --
nit: we can use `sparkSession.sessionState.conf.isCaseSensitive` here, as
it's much cheaper than serializing a function.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]