Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14864#discussion_r77122392
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
    @@ -156,24 +156,56 @@ case class FileSourceScanExec(
         false
       }
     
    -  override val outputPartitioning: Partitioning = {
    +  override val (outputPartitioning, outputOrdering): (Partitioning, 
Seq[SortOrder]) = {
         val bucketSpec = if 
(relation.sparkSession.sessionState.conf.bucketingEnabled) {
           relation.bucketSpec
         } else {
           None
         }
    -    bucketSpec.map { spec =>
    -      val numBuckets = spec.numBuckets
    -      val bucketColumns = spec.bucketColumnNames.flatMap { n =>
    -        output.find(_.name == n)
    -      }
    -      if (bucketColumns.size == spec.bucketColumnNames.size) {
    -        HashPartitioning(bucketColumns, numBuckets)
    -      } else {
    -        UnknownPartitioning(0)
    -      }
    -    }.getOrElse {
    -      UnknownPartitioning(0)
    +    bucketSpec match {
    +      case Some(spec) =>
    +        val numBuckets = spec.numBuckets
    +        val bucketColumns = spec.bucketColumnNames.flatMap { n =>
    +          output.find(_.name == n)
    +        }
    +        if (bucketColumns.size == spec.bucketColumnNames.size) {
    +          val partitioning = HashPartitioning(bucketColumns, numBuckets)
    +
    +          val sortOrder = if (spec.sortColumnNames.nonEmpty) {
    +            // In case of bucketing, its possible to have multiple files 
belonging to the
    +            // same bucket in a given relation. Each of these files are 
locally sorted
    +            // but those files combined together are not globally sorted. 
Given that,
    +            // the RDD partition will not be sorted even if the relation 
has sort columns set
    +            // Current solution is to check if all the buckets have a 
single file in it
    +
    +            val files =
    +              
relation.location.listFiles(partitionFilters).flatMap(partition => 
partition.files)
    +            val bucketToFilesGrouping =
    +              files.map(_.getPath.getName).groupBy(file => 
BucketingUtils.getBucketId(file))
    +            val singleFilePartitions = bucketToFilesGrouping.forall(p => 
p._2.length <= 1)
    --- End diff --
    
    yea that's a good question, single file per bucket looks more reasonable, 
it's more important to read bucketed table fast than writing it fast. But how 
about data insertion? Does hive support inserting into bucketed table?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to