[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...

2018-11-28 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21004#discussion_r237093630
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
 ---
@@ -126,35 +126,32 @@ abstract class PartitioningAwareFileIndex(
 val caseInsensitiveOptions = CaseInsensitiveMap(parameters)
 val timeZoneId = 
caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
   .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone)
-
-userPartitionSchema match {
+val inferredPartitionSpec = PartitioningUtils.parsePartitions(
+  leafDirs,
+  typeInference = 
sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
--- End diff --

actually the investigation was done by the reported of SPARK-26188, I did 
nothing... Thanks for doing that @gengliangwang and thanks for your comment 
@cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...

2018-11-28 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/21004#discussion_r237092452
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
 ---
@@ -126,35 +126,32 @@ abstract class PartitioningAwareFileIndex(
 val caseInsensitiveOptions = CaseInsensitiveMap(parameters)
 val timeZoneId = 
caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
   .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone)
-
-userPartitionSchema match {
+val inferredPartitionSpec = PartitioningUtils.parsePartitions(
+  leafDirs,
+  typeInference = 
sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
--- End diff --

@mgaido91 Thanks for the investigation!!
I will fix it and add test case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...

2018-11-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21004#discussion_r237040743
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
 ---
@@ -126,35 +126,32 @@ abstract class PartitioningAwareFileIndex(
 val caseInsensitiveOptions = CaseInsensitiveMap(parameters)
 val timeZoneId = 
caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
   .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone)
-
-userPartitionSchema match {
+val inferredPartitionSpec = PartitioningUtils.parsePartitions(
+  leafDirs,
+  typeInference = 
sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
--- End diff --

Before this patch, there was a subtle difference between with and without a 
user-provided partition schema:
1. with user-provided partition schema, we should not infer data types. We 
should infer as string and cast to user-provided type
2. without user-provided partition schema, we should infer the data 
type(with a config)

So it was wrong to unify these 2 code paths. @gengliangwang can you change 
it back?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...

2018-11-28 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21004#discussion_r236998030
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
 ---
@@ -126,35 +126,32 @@ abstract class PartitioningAwareFileIndex(
 val caseInsensitiveOptions = CaseInsensitiveMap(parameters)
 val timeZoneId = 
caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
   .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone)
-
-userPartitionSchema match {
+val inferredPartitionSpec = PartitioningUtils.parsePartitions(
+  leafDirs,
+  typeInference = 
sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
--- End diff --

this is causing a behavior change in Spark 2.4.0 reported in SPARK-26188. 
Why did we need this change?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...

2018-04-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21004


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...

2018-04-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21004#discussion_r181289458
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -95,6 +95,14 @@ case class DataSource(
   lazy val sourceInfo: SourceInfo = sourceSchema()
   private val caseInsensitiveOptions = CaseInsensitiveMap(options)
   private val equality = sparkSession.sessionState.conf.resolver
+  // The operations below are expensive therefore try not to do them if we 
don't need to, e.g.,
+  // in streaming mode, we have already inferred and registered partition 
columns, we will
+  // never have to materialize the lazy val below
+  private lazy val tempFileIndex = {
--- End diff --

let's just inline it. People can still create a new index in the future, 
technically this can't prevent users from doing that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...

2018-04-12 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/21004#discussion_r181283769
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -552,6 +523,40 @@ case class DataSource(
 sys.error(s"${providingClass.getCanonicalName} does not allow 
create table as select.")
 }
   }
+
+  /** Returns an [[InMemoryFileIndex]] that can be used to get partition 
schema and file list. */
+  private def createInMemoryFileIndex(globbedPaths: Seq[Path]): 
InMemoryFileIndex = {
--- End diff --

No, we can't. In some case we need to check the glob files, while we don't 
need to create `InMemoryFileIndex`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...

2018-04-12 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/21004#discussion_r181283665
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -95,6 +95,14 @@ case class DataSource(
   lazy val sourceInfo: SourceInfo = sourceSchema()
   private val caseInsensitiveOptions = CaseInsensitiveMap(options)
   private val equality = sparkSession.sessionState.conf.resolver
+  // The operations below are expensive therefore try not to do them if we 
don't need to, e.g.,
+  // in streaming mode, we have already inferred and registered partition 
columns, we will
+  // never have to materialize the lazy val below
+  private lazy val tempFileIndex = {
--- End diff --

I moved it here on purpose. So it may be avoid being created twice in the 
future.
I am OK to inline it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...

2018-04-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21004#discussion_r181269578
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -552,6 +523,40 @@ case class DataSource(
 sys.error(s"${providingClass.getCanonicalName} does not allow 
create table as select.")
 }
   }
+
+  /** Returns an [[InMemoryFileIndex]] that can be used to get partition 
schema and file list. */
+  private def createInMemoryFileIndex(globbedPaths: Seq[Path]): 
InMemoryFileIndex = {
--- End diff --

and we can merge `checkAndGlobPathIfNecessary` and `createInMemoryFileIndex`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...

2018-04-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21004#discussion_r181269464
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -552,6 +523,40 @@ case class DataSource(
 sys.error(s"${providingClass.getCanonicalName} does not allow 
create table as select.")
 }
   }
+
+  /** Returns an [[InMemoryFileIndex]] that can be used to get partition 
schema and file list. */
+  private def createInMemoryFileIndex(globbedPaths: Seq[Path]): 
InMemoryFileIndex = {
--- End diff --

this can be `def createInMemoryFileIndex(checkEmptyGlobPath: Boolean)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...

2018-04-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21004#discussion_r181269313
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -384,24 +356,23 @@ case class DataSource(
 
   // This is a non-streaming file based datasource.
   case (format: FileFormat, _) =>
-val allPaths = caseInsensitiveOptions.get("path") ++ paths
-val hadoopConf = sparkSession.sessionState.newHadoopConf()
-val globbedPaths = allPaths.flatMap(
-  DataSource.checkAndGlobPathIfNecessary(hadoopConf, _, 
checkFilesExist)).toArray
-
-val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
-val (dataSchema, partitionSchema) = 
getOrInferFileFormatSchema(format, fileStatusCache)
-
-val fileCatalog = if 
(sparkSession.sqlContext.conf.manageFilesourcePartitions &&
-catalogTable.isDefined && 
catalogTable.get.tracksPartitionsInCatalog) {
+val globbedPaths =
+  checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, 
checkFilesExist = checkFilesExist)
+val useCatalogFileIndex = 
sparkSession.sqlContext.conf.manageFilesourcePartitions &&
+  catalogTable.isDefined && 
catalogTable.get.tracksPartitionsInCatalog &&
+  catalogTable.get.partitionSchema.nonEmpty
--- End diff --

use `partitionColumnNames` over `partitionSchema`, since 
`partitionColumnNames` is a val and `partitionSchema` is def


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...

2018-04-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21004#discussion_r181269206
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -95,6 +95,14 @@ case class DataSource(
   lazy val sourceInfo: SourceInfo = sourceSchema()
   private val caseInsensitiveOptions = CaseInsensitiveMap(options)
   private val equality = sparkSession.sessionState.conf.resolver
+  // The operations below are expensive therefore try not to do them if we 
don't need to, e.g.,
+  // in streaming mode, we have already inferred and registered partition 
columns, we will
+  // never have to materialize the lazy val below
+  private lazy val tempFileIndex = {
--- End diff --

it's only used once, no need to be a lazy val, we can just inline it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...

2018-04-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21004#discussion_r181104239
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -200,10 +178,14 @@ case class DataSource(
 val dataSchema = userSpecifiedSchema.map { schema =>
   StructType(schema.filterNot(f => partitionSchema.exists(p => 
equality(p.name, f.name
 }.orElse {
+  val index = fileIndex match {
+case i: InMemoryFileIndex => i
+case _ => tempFileIndex
+  }
--- End diff --

why?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...

2018-04-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21004#discussion_r181103480
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -140,41 +130,29 @@ case class DataSource(
* be any further inference in any triggers.
*
* @param format the file format object for this DataSource
-   * @param fileStatusCache the shared cache for file statuses to speed up 
listing
+   * @param optionalFileIndex optional [[FileIndex]] for getting partition 
schema and file list
* @return A pair of the data schema (excluding partition columns) and 
the schema of the partition
* columns.
*/
   private def getOrInferFileFormatSchema(
   format: FileFormat,
-  fileStatusCache: FileStatusCache = NoopCache): (StructType, 
StructType) = {
-// the operations below are expensive therefore try not to do them if 
we don't need to, e.g.,
-// in streaming mode, we have already inferred and registered 
partition columns, we will
-// never have to materialize the lazy val below
-lazy val tempFileIndex = {
-  val allPaths = caseInsensitiveOptions.get("path") ++ paths
-  val hadoopConf = sparkSession.sessionState.newHadoopConf()
-  val globbedPaths = allPaths.toSeq.flatMap { path =>
-val hdfsPath = new Path(path)
-val fs = hdfsPath.getFileSystem(hadoopConf)
-val qualified = hdfsPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory)
-SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
-  }.toArray
-  new InMemoryFileIndex(sparkSession, globbedPaths, options, None, 
fileStatusCache)
-}
+  optionalFileIndex: Option[FileIndex] = None): (StructType, 
StructType) = {
--- End diff --

`existingFileIndex`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...

2018-04-12 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/21004#discussion_r181025558
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -384,13 +356,9 @@ case class DataSource(
 
   // This is a non-streaming file based datasource.
   case (format: FileFormat, _) =>
-val allPaths = caseInsensitiveOptions.get("path") ++ paths
-val hadoopConf = sparkSession.sessionState.newHadoopConf()
-val globbedPaths = allPaths.flatMap(
-  DataSource.checkAndGlobPathIfNecessary(hadoopConf, _, 
checkFilesExist)).toArray
-
-val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
-val (dataSchema, partitionSchema) = 
getOrInferFileFormatSchema(format, fileStatusCache)
+checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, 
checkFilesExist = checkFilesExist)
--- End diff --

Yes. Originally it glob twice too.  I don't have a good solution to avoid 
this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...

2018-04-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21004#discussion_r180995890
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -384,13 +356,9 @@ case class DataSource(
 
   // This is a non-streaming file based datasource.
   case (format: FileFormat, _) =>
-val allPaths = caseInsensitiveOptions.get("path") ++ paths
-val hadoopConf = sparkSession.sessionState.newHadoopConf()
-val globbedPaths = allPaths.flatMap(
-  DataSource.checkAndGlobPathIfNecessary(hadoopConf, _, 
checkFilesExist)).toArray
-
-val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
-val (dataSchema, partitionSchema) = 
getOrInferFileFormatSchema(format, fileStatusCache)
+checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, 
checkFilesExist = checkFilesExist)
--- End diff --

now we may glob the path twice?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...

2018-04-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21004#discussion_r180995458
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -384,13 +358,9 @@ case class DataSource(
 
   // This is a non-streaming file based datasource.
   case (format: FileFormat, _) =>
-val allPaths = caseInsensitiveOptions.get("path") ++ paths
-val hadoopConf = sparkSession.sessionState.newHadoopConf()
-val globbedPaths = allPaths.flatMap(
-  DataSource.checkAndGlobPathIfNecessary(hadoopConf, _, 
checkFilesExist)).toArray
-
-val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
-val (dataSchema, partitionSchema) = 
getOrInferFileFormatSchema(format, fileStatusCache)
+checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, 
checkFilesExist = checkFilesExist)
+val (dataSchema, partitionSchema) =
+  getOrInferFileFormatSchema(format)
--- End diff --

now it can be merged to the above line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...

2018-04-11 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/21004#discussion_r180745018
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
 ---
@@ -81,7 +81,7 @@ class PartitionProviderCompatibilitySuite
   HiveCatalogMetrics.reset()
   assert(spark.sql("select * from test where partCol < 2").count() 
== 2)
   assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() 
== 2)
-  assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 
2)
+  assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 
7)
--- End diff --

all the files should be parsed once for creating file index. So it is `5 + 
2`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...

2018-04-11 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/21004#discussion_r180744104
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
 ---
@@ -126,35 +126,35 @@ abstract class PartitioningAwareFileIndex(
 val caseInsensitiveOptions = CaseInsensitiveMap(parameters)
 val timeZoneId = 
caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
   .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone)
-
-userPartitionSchema match {
+val inferredPartitionSpec = PartitioningUtils.parsePartitions(
+  leafDirs,
+  typeInference = 
sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
+  basePaths = basePaths,
+  timeZoneId = timeZoneId)
+userSpecifiedSchema match {
   case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
-val spec = PartitioningUtils.parsePartitions(
-  leafDirs,
-  typeInference = false,
-  basePaths = basePaths,
-  timeZoneId = timeZoneId)
+val userPartitionSchema =
+  
combineInferredAndUserSpecifiedPartitionSchema(inferredPartitionSpec)
 
-// Without auto inference, all of value in the `row` should be 
null or in StringType,
 // we need to cast into the data type that user specified.
 def castPartitionValuesToUserSchema(row: InternalRow) = {
   InternalRow((0 until row.numFields).map { i =>
+val expr = 
inferredPartitionSpec.partitionColumns.fields(i).dataType match {
+  case StringType => Literal.create(row.getUTF8String(i), 
StringType)
--- End diff --

`row.get(i, StringType)`  throws exception


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...

2018-04-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21004#discussion_r180720924
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
 ---
@@ -81,7 +81,7 @@ class PartitionProviderCompatibilitySuite
   HiveCatalogMetrics.reset()
   assert(spark.sql("select * from test where partCol < 2").count() 
== 2)
   assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() 
== 2)
-  assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 
2)
+  assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 
7)
--- End diff --

what happened here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...

2018-04-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21004#discussion_r180720733
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
 ---
@@ -126,35 +126,35 @@ abstract class PartitioningAwareFileIndex(
 val caseInsensitiveOptions = CaseInsensitiveMap(parameters)
 val timeZoneId = 
caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
   .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone)
-
-userPartitionSchema match {
+val inferredPartitionSpec = PartitioningUtils.parsePartitions(
+  leafDirs,
+  typeInference = 
sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
+  basePaths = basePaths,
+  timeZoneId = timeZoneId)
+userSpecifiedSchema match {
   case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
-val spec = PartitioningUtils.parsePartitions(
-  leafDirs,
-  typeInference = false,
-  basePaths = basePaths,
-  timeZoneId = timeZoneId)
+val userPartitionSchema =
+  
combineInferredAndUserSpecifiedPartitionSchema(inferredPartitionSpec)
 
-// Without auto inference, all of value in the `row` should be 
null or in StringType,
 // we need to cast into the data type that user specified.
 def castPartitionValuesToUserSchema(row: InternalRow) = {
   InternalRow((0 until row.numFields).map { i =>
+val expr = 
inferredPartitionSpec.partitionColumns.fields(i).dataType match {
+  case StringType => Literal.create(row.getUTF8String(i), 
StringType)
--- End diff --

why special case string type?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...

2018-04-08 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/21004#discussion_r179957491
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
 ---
@@ -126,35 +126,35 @@ abstract class PartitioningAwareFileIndex(
 val caseInsensitiveOptions = CaseInsensitiveMap(parameters)
 val timeZoneId = 
caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
   .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone)
-
-userPartitionSchema match {
+val inferredPartitionSpec = PartitioningUtils.parsePartitions(
+  leafDirs,
+  typeInference = 
sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
+  basePaths = basePaths,
+  timeZoneId = timeZoneId)
+userSpecifiedSchema match {
   case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
-val spec = PartitioningUtils.parsePartitions(
-  leafDirs,
-  typeInference = false,
-  basePaths = basePaths,
-  timeZoneId = timeZoneId)
+val userPartitionSchema =
+  
combineInferredAndUserSpecifiedPartitionSchema(inferredPartitionSpec)
 
-// Without auto inference, all of value in the `row` should be 
null or in StringType,
 // we need to cast into the data type that user specified.
 def castPartitionValuesToUserSchema(row: InternalRow) = {
   InternalRow((0 until row.numFields).map { i =>
+val expr = 
inferredPartitionSpec.partitionColumns.fields(i).dataType match {
+  case StringType => Literal.create(row.getUTF8String(i), 
StringType)
+  case otherType => Literal.create(row.get(i, otherType))
--- End diff --

Here I am not very sure that all the other cases are covered.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...

2018-04-08 Thread gengliangwang
GitHub user gengliangwang opened a pull request:

https://github.com/apache/spark/pull/21004

[SPARK-23896][SQL]Improve PartitioningAwareFileIndex

## What changes were proposed in this pull request?

Currently `PartitioningAwareFileIndex` accepts an optional parameter 
`userPartitionSchema`. If provided, it will combine the inferred partition 
schema with the parameter.

However,
1. to get `userPartitionSchema`, we need to  combine inferred partition 
schema with `userSpecifiedSchema`
2. to get the inferred partition schema, we have to create a temporary file 
index.

Only after that, a final version of `PartitioningAwareFileIndex` can be 
created.

This can be improved by passing `userSpecifiedSchema` to 
`PartitioningAwareFileIndex`.

With the improvement, we can reduce redundant code and avoid parsing the 
file partition twice. 
## How was this patch tested?
Unit test


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gengliangwang/spark PartitioningAwareFileIndex

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21004.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21004


commit 35aff24743ff13ccd370a8e3747a3044e8a671c9
Author: Gengliang Wang 
Date:   2018-04-08T18:19:48Z

improve PartitioningAwareFileIndex




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org