Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48610578
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
 ---
    @@ -344,66 +371,129 @@ private[sql] class DynamicPartitionWriterContainer(
     
         // If anything below fails, we should abort the task.
         try {
    -      // This will be filled in if we have to fall back on sorting.
    -      var sorter: UnsafeKVExternalSorter = null
    -      while (iterator.hasNext && sorter == null) {
    -        val inputRow = iterator.next()
    -        val currentKey = getPartitionKey(inputRow)
    -        var currentWriter = outputWriters.get(currentKey)
    -
    -        if (currentWriter == null) {
    -          if (outputWriters.size < maxOpenFiles) {
    -            currentWriter = newOutputWriter(currentKey)
    -            outputWriters.put(currentKey.copy(), currentWriter)
    -            currentWriter.writeInternal(getOutputRow(inputRow))
    -          } else {
    -            logInfo(s"Maximum partitions reached, falling back on 
sorting.")
    -            sorter = new UnsafeKVExternalSorter(
    -              StructType.fromAttributes(partitionColumns),
    -              StructType.fromAttributes(dataColumns),
    -              SparkEnv.get.blockManager,
    -              TaskContext.get().taskMemoryManager().pageSizeBytes)
    -            sorter.insertKV(currentKey, getOutputRow(inputRow))
    +      val mustSort = bucketSpec.isDefined && 
bucketSpec.get.sortingColumns.isDefined
    +      // TODO: remove duplicated code.
    +      if (mustSort) {
    +        val bucketColumns = 
bucketSpec.get.resolvedBucketingColumns(inputSchema)
    +        val sortColumns = 
bucketSpec.get.resolvedSortingColumns(inputSchema)
    +
    +        val getSortingKey = {
    +          val getBucketKey = UnsafeProjection.create(bucketColumns, 
inputSchema)
    +          val getResultRow = UnsafeProjection.create(
    +            (partitionColumns :+ Literal(-1)) ++ sortColumns, inputSchema)
    +          (row: InternalRow) => {
    +            val bucketId = math.abs(getBucketKey(row).hashCode()) % 
bucketSpec.get.numBuckets
    +            val result = getResultRow(row)
    +            result.setInt(partitionColumns.length, bucketId)
    +            result
               }
    -        } else {
    -          currentWriter.writeInternal(getOutputRow(inputRow))
             }
    -      }
     
    -      // If the sorter is not null that means that we reached the maxFiles 
above and need to finish
    -      // using external sort.
    -      if (sorter != null) {
    +        val sortingKeySchema = {
    +          val fields = StructType.fromAttributes(partitionColumns)
    +            .add("bucketId", IntegerType, nullable = false) ++
    +            StructType.fromAttributes(sortColumns)
    +          StructType(fields)
    +        }
    +
    +        val sorter = new UnsafeKVExternalSorter(
    +          sortingKeySchema,
    +          StructType.fromAttributes(dataColumns),
    +          SparkEnv.get.blockManager,
    +          TaskContext.get().taskMemoryManager().pageSizeBytes)
    +
             while (iterator.hasNext) {
               val currentRow = iterator.next()
    -          sorter.insertKV(getPartitionKey(currentRow), 
getOutputRow(currentRow))
    +          sorter.insertKV(getSortingKey(currentRow), 
getOutputRow(currentRow))
             }
     
             logInfo(s"Sorting complete. Writing out partition files one at a 
time.")
     
    +        def sameBucket(row1: InternalRow, row2: InternalRow): Boolean = {
    +          partitionColumns.map(_.dataType).zipWithIndex.forall { case (dt, 
index) =>
    +            row1.get(index, dt) == row2.get(index, dt)
    +          } && row1.getInt(partitionColumns.length) == 
row2.getInt(partitionColumns.length)
    +        }
    --- End diff --
    
    Please avoid using functional style on performance critical code path.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to