SaurabhChawla100 commented on a change in pull request #29045:
URL: https://github.com/apache/spark/pull/29045#discussion_r455018000
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
##########
@@ -66,24 +66,28 @@ case class OrcPartitionReaderFactory(
override def buildReader(file: PartitionedFile):
PartitionReader[InternalRow] = {
val conf = broadcastedConf.value.value
- val resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema)
- OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString)
OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(conf,
isCaseSensitive)
val filePath = new Path(new URI(file.filePath))
val fs = filePath.getFileSystem(conf)
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
- val requestedColIdsOrEmptyFile =
+ val resultedColPruneInfo =
Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) {
reader =>
OrcUtils.requestedColumnIds(
isCaseSensitive, dataSchema, readDataSchema, reader, conf)
}
- if (requestedColIdsOrEmptyFile.isEmpty) {
+ if (resultedColPruneInfo.isEmpty) {
new EmptyPartitionReader[InternalRow]
} else {
- val requestedColIds = requestedColIdsOrEmptyFile.get
+ val (requestedColIds, canPruneCols) = resultedColPruneInfo.get
+ val resultSchemaString = if (canPruneCols) {
+ OrcUtils.orcTypeDescriptionString(resultSchema)
+ } else {
+ OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields ++
partitionSchema.fields))
+ }
+ OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString)
Review comment:
@cloud-fan - In this we need to return the resultSchemaString from this
method Option[(Array[Int], String)]
which is for else if (orcFieldNames.forall(_.startsWith("_col"))) {
val resultSchemaString =
OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields ++
partitionSchema.fields)
else
val resultSchemaString =
OrcUtils.orcTypeDescriptionString(StructType(requiredSchema.fields ++
partitionSchema.fields)))
since we are using this resultSchemaString in
batchReader.initBatch(
TypeDescription.fromString(resultSchemaString),
resultSchema.fields,
shall we make this change or create some helper method from the code in orc
utils
val resultSchemaString =someMethod()
someMethod(): String {
val resultSchemaString = if (canPruneCols) {
OrcUtils.orcTypeDescriptionString(resultSchema)
} else {
OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields ++
partitionSchema.fields))
}
OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString)
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]