[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r237093630 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala --- @@ -126,35 +126,32 @@ abstract class PartitioningAwareFileIndex( val caseInsensitiveOptions = CaseInsensitiveMap(parameters) val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION) .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone) - -userPartitionSchema match { +val inferredPartitionSpec = PartitioningUtils.parsePartitions( + leafDirs, + typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, --- End diff -- actually the investigation was done by the reported of SPARK-26188, I did nothing... Thanks for doing that @gengliangwang and thanks for your comment @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r237092452 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala --- @@ -126,35 +126,32 @@ abstract class PartitioningAwareFileIndex( val caseInsensitiveOptions = CaseInsensitiveMap(parameters) val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION) .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone) - -userPartitionSchema match { +val inferredPartitionSpec = PartitioningUtils.parsePartitions( + leafDirs, + typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, --- End diff -- @mgaido91 Thanks for the investigation!! I will fix it and add test case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r237040743 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala --- @@ -126,35 +126,32 @@ abstract class PartitioningAwareFileIndex( val caseInsensitiveOptions = CaseInsensitiveMap(parameters) val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION) .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone) - -userPartitionSchema match { +val inferredPartitionSpec = PartitioningUtils.parsePartitions( + leafDirs, + typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, --- End diff -- Before this patch, there was a subtle difference between with and without a user-provided partition schema: 1. with user-provided partition schema, we should not infer data types. We should infer as string and cast to user-provided type 2. without user-provided partition schema, we should infer the data type(with a config) So it was wrong to unify these 2 code paths. @gengliangwang can you change it back? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r236998030 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala --- @@ -126,35 +126,32 @@ abstract class PartitioningAwareFileIndex( val caseInsensitiveOptions = CaseInsensitiveMap(parameters) val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION) .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone) - -userPartitionSchema match { +val inferredPartitionSpec = PartitioningUtils.parsePartitions( + leafDirs, + typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, --- End diff -- this is causing a behavior change in Spark 2.4.0 reported in SPARK-26188. Why did we need this change? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21004 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r181289458 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -95,6 +95,14 @@ case class DataSource( lazy val sourceInfo: SourceInfo = sourceSchema() private val caseInsensitiveOptions = CaseInsensitiveMap(options) private val equality = sparkSession.sessionState.conf.resolver + // The operations below are expensive therefore try not to do them if we don't need to, e.g., + // in streaming mode, we have already inferred and registered partition columns, we will + // never have to materialize the lazy val below + private lazy val tempFileIndex = { --- End diff -- let's just inline it. People can still create a new index in the future, technically this can't prevent users from doing that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r181283769 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -552,6 +523,40 @@ case class DataSource( sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.") } } + + /** Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list. */ + private def createInMemoryFileIndex(globbedPaths: Seq[Path]): InMemoryFileIndex = { --- End diff -- No, we can't. In some case we need to check the glob files, while we don't need to create `InMemoryFileIndex` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r181283665 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -95,6 +95,14 @@ case class DataSource( lazy val sourceInfo: SourceInfo = sourceSchema() private val caseInsensitiveOptions = CaseInsensitiveMap(options) private val equality = sparkSession.sessionState.conf.resolver + // The operations below are expensive therefore try not to do them if we don't need to, e.g., + // in streaming mode, we have already inferred and registered partition columns, we will + // never have to materialize the lazy val below + private lazy val tempFileIndex = { --- End diff -- I moved it here on purpose. So it may be avoid being created twice in the future. I am OK to inline it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r181269578 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -552,6 +523,40 @@ case class DataSource( sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.") } } + + /** Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list. */ + private def createInMemoryFileIndex(globbedPaths: Seq[Path]): InMemoryFileIndex = { --- End diff -- and we can merge `checkAndGlobPathIfNecessary` and `createInMemoryFileIndex` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r181269464 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -552,6 +523,40 @@ case class DataSource( sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.") } } + + /** Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list. */ + private def createInMemoryFileIndex(globbedPaths: Seq[Path]): InMemoryFileIndex = { --- End diff -- this can be `def createInMemoryFileIndex(checkEmptyGlobPath: Boolean)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r181269313 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -384,24 +356,23 @@ case class DataSource( // This is a non-streaming file based datasource. case (format: FileFormat, _) => -val allPaths = caseInsensitiveOptions.get("path") ++ paths -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val globbedPaths = allPaths.flatMap( - DataSource.checkAndGlobPathIfNecessary(hadoopConf, _, checkFilesExist)).toArray - -val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) -val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache) - -val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions && -catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) { +val globbedPaths = + checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist) +val useCatalogFileIndex = sparkSession.sqlContext.conf.manageFilesourcePartitions && + catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog && + catalogTable.get.partitionSchema.nonEmpty --- End diff -- use `partitionColumnNames` over `partitionSchema`, since `partitionColumnNames` is a val and `partitionSchema` is def --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r181269206 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -95,6 +95,14 @@ case class DataSource( lazy val sourceInfo: SourceInfo = sourceSchema() private val caseInsensitiveOptions = CaseInsensitiveMap(options) private val equality = sparkSession.sessionState.conf.resolver + // The operations below are expensive therefore try not to do them if we don't need to, e.g., + // in streaming mode, we have already inferred and registered partition columns, we will + // never have to materialize the lazy val below + private lazy val tempFileIndex = { --- End diff -- it's only used once, no need to be a lazy val, we can just inline it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r181104239 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -200,10 +178,14 @@ case class DataSource( val dataSchema = userSpecifiedSchema.map { schema => StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name }.orElse { + val index = fileIndex match { +case i: InMemoryFileIndex => i +case _ => tempFileIndex + } --- End diff -- why? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r181103480 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -140,41 +130,29 @@ case class DataSource( * be any further inference in any triggers. * * @param format the file format object for this DataSource - * @param fileStatusCache the shared cache for file statuses to speed up listing + * @param optionalFileIndex optional [[FileIndex]] for getting partition schema and file list * @return A pair of the data schema (excluding partition columns) and the schema of the partition * columns. */ private def getOrInferFileFormatSchema( format: FileFormat, - fileStatusCache: FileStatusCache = NoopCache): (StructType, StructType) = { -// the operations below are expensive therefore try not to do them if we don't need to, e.g., -// in streaming mode, we have already inferred and registered partition columns, we will -// never have to materialize the lazy val below -lazy val tempFileIndex = { - val allPaths = caseInsensitiveOptions.get("path") ++ paths - val hadoopConf = sparkSession.sessionState.newHadoopConf() - val globbedPaths = allPaths.toSeq.flatMap { path => -val hdfsPath = new Path(path) -val fs = hdfsPath.getFileSystem(hadoopConf) -val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) -SparkHadoopUtil.get.globPathIfNecessary(fs, qualified) - }.toArray - new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache) -} + optionalFileIndex: Option[FileIndex] = None): (StructType, StructType) = { --- End diff -- `existingFileIndex` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r181025558 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -384,13 +356,9 @@ case class DataSource( // This is a non-streaming file based datasource. case (format: FileFormat, _) => -val allPaths = caseInsensitiveOptions.get("path") ++ paths -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val globbedPaths = allPaths.flatMap( - DataSource.checkAndGlobPathIfNecessary(hadoopConf, _, checkFilesExist)).toArray - -val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) -val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache) +checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist) --- End diff -- Yes. Originally it glob twice too. I don't have a good solution to avoid this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r180995890 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -384,13 +356,9 @@ case class DataSource( // This is a non-streaming file based datasource. case (format: FileFormat, _) => -val allPaths = caseInsensitiveOptions.get("path") ++ paths -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val globbedPaths = allPaths.flatMap( - DataSource.checkAndGlobPathIfNecessary(hadoopConf, _, checkFilesExist)).toArray - -val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) -val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache) +checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist) --- End diff -- now we may glob the path twice? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r180995458 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -384,13 +358,9 @@ case class DataSource( // This is a non-streaming file based datasource. case (format: FileFormat, _) => -val allPaths = caseInsensitiveOptions.get("path") ++ paths -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val globbedPaths = allPaths.flatMap( - DataSource.checkAndGlobPathIfNecessary(hadoopConf, _, checkFilesExist)).toArray - -val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) -val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache) +checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist) +val (dataSchema, partitionSchema) = + getOrInferFileFormatSchema(format) --- End diff -- now it can be merged to the above line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r180745018 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala --- @@ -81,7 +81,7 @@ class PartitionProviderCompatibilitySuite HiveCatalogMetrics.reset() assert(spark.sql("select * from test where partCol < 2").count() == 2) assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 2) - assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 7) --- End diff -- all the files should be parsed once for creating file index. So it is `5 + 2` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r180744104 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala --- @@ -126,35 +126,35 @@ abstract class PartitioningAwareFileIndex( val caseInsensitiveOptions = CaseInsensitiveMap(parameters) val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION) .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone) - -userPartitionSchema match { +val inferredPartitionSpec = PartitioningUtils.parsePartitions( + leafDirs, + typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, + basePaths = basePaths, + timeZoneId = timeZoneId) +userSpecifiedSchema match { case Some(userProvidedSchema) if userProvidedSchema.nonEmpty => -val spec = PartitioningUtils.parsePartitions( - leafDirs, - typeInference = false, - basePaths = basePaths, - timeZoneId = timeZoneId) +val userPartitionSchema = + combineInferredAndUserSpecifiedPartitionSchema(inferredPartitionSpec) -// Without auto inference, all of value in the `row` should be null or in StringType, // we need to cast into the data type that user specified. def castPartitionValuesToUserSchema(row: InternalRow) = { InternalRow((0 until row.numFields).map { i => +val expr = inferredPartitionSpec.partitionColumns.fields(i).dataType match { + case StringType => Literal.create(row.getUTF8String(i), StringType) --- End diff -- `row.get(i, StringType)` throws exception --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r180720924 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala --- @@ -81,7 +81,7 @@ class PartitionProviderCompatibilitySuite HiveCatalogMetrics.reset() assert(spark.sql("select * from test where partCol < 2").count() == 2) assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 2) - assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 7) --- End diff -- what happened here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r180720733 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala --- @@ -126,35 +126,35 @@ abstract class PartitioningAwareFileIndex( val caseInsensitiveOptions = CaseInsensitiveMap(parameters) val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION) .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone) - -userPartitionSchema match { +val inferredPartitionSpec = PartitioningUtils.parsePartitions( + leafDirs, + typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, + basePaths = basePaths, + timeZoneId = timeZoneId) +userSpecifiedSchema match { case Some(userProvidedSchema) if userProvidedSchema.nonEmpty => -val spec = PartitioningUtils.parsePartitions( - leafDirs, - typeInference = false, - basePaths = basePaths, - timeZoneId = timeZoneId) +val userPartitionSchema = + combineInferredAndUserSpecifiedPartitionSchema(inferredPartitionSpec) -// Without auto inference, all of value in the `row` should be null or in StringType, // we need to cast into the data type that user specified. def castPartitionValuesToUserSchema(row: InternalRow) = { InternalRow((0 until row.numFields).map { i => +val expr = inferredPartitionSpec.partitionColumns.fields(i).dataType match { + case StringType => Literal.create(row.getUTF8String(i), StringType) --- End diff -- why special case string type? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r179957491 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala --- @@ -126,35 +126,35 @@ abstract class PartitioningAwareFileIndex( val caseInsensitiveOptions = CaseInsensitiveMap(parameters) val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION) .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone) - -userPartitionSchema match { +val inferredPartitionSpec = PartitioningUtils.parsePartitions( + leafDirs, + typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, + basePaths = basePaths, + timeZoneId = timeZoneId) +userSpecifiedSchema match { case Some(userProvidedSchema) if userProvidedSchema.nonEmpty => -val spec = PartitioningUtils.parsePartitions( - leafDirs, - typeInference = false, - basePaths = basePaths, - timeZoneId = timeZoneId) +val userPartitionSchema = + combineInferredAndUserSpecifiedPartitionSchema(inferredPartitionSpec) -// Without auto inference, all of value in the `row` should be null or in StringType, // we need to cast into the data type that user specified. def castPartitionValuesToUserSchema(row: InternalRow) = { InternalRow((0 until row.numFields).map { i => +val expr = inferredPartitionSpec.partitionColumns.fields(i).dataType match { + case StringType => Literal.create(row.getUTF8String(i), StringType) + case otherType => Literal.create(row.get(i, otherType)) --- End diff -- Here I am not very sure that all the other cases are covered. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
GitHub user gengliangwang opened a pull request: https://github.com/apache/spark/pull/21004 [SPARK-23896][SQL]Improve PartitioningAwareFileIndex ## What changes were proposed in this pull request? Currently `PartitioningAwareFileIndex` accepts an optional parameter `userPartitionSchema`. If provided, it will combine the inferred partition schema with the parameter. However, 1. to get `userPartitionSchema`, we need to combine inferred partition schema with `userSpecifiedSchema` 2. to get the inferred partition schema, we have to create a temporary file index. Only after that, a final version of `PartitioningAwareFileIndex` can be created. This can be improved by passing `userSpecifiedSchema` to `PartitioningAwareFileIndex`. With the improvement, we can reduce redundant code and avoid parsing the file partition twice. ## How was this patch tested? Unit test You can merge this pull request into a Git repository by running: $ git pull https://github.com/gengliangwang/spark PartitioningAwareFileIndex Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21004.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21004 commit 35aff24743ff13ccd370a8e3747a3044e8a671c9 Author: Gengliang WangDate: 2018-04-08T18:19:48Z improve PartitioningAwareFileIndex --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org