Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/16898#discussion_r101320333
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
---
@@ -363,80 +393,42 @@ object FileFormatWriter extends Logging {
committer.newTaskTempFile(taskAttemptContext, partDir, ext)
}
- currentWriter = description.outputWriterFactory.newInstance(
+ currentWriter = desc.outputWriterFactory.newInstance(
path = path,
- dataSchema = description.dataColumns.toStructType,
+ dataSchema = desc.dataColumns.toStructType,
context = taskAttemptContext)
}
override def execute(iter: Iterator[InternalRow]): Set[String] = {
- // We should first sort by partition columns, then bucket id, and
finally sorting columns.
- val sortingExpressions: Seq[Expression] =
- description.partitionColumns ++ bucketIdExpression ++ sortColumns
- val getSortingKey = UnsafeProjection.create(sortingExpressions,
description.allColumns)
-
- val sortingKeySchema = StructType(sortingExpressions.map {
- case a: Attribute => StructField(a.name, a.dataType, a.nullable)
- // The sorting expressions are all `Attribute` except bucket id.
- case _ => StructField("bucketId", IntegerType, nullable = false)
- })
+ val getPartitionColsAndBucketId = UnsafeProjection.create(
+ desc.partitionColumns ++ bucketIdExpression, desc.allColumns)
- // Returns the data columns to be written given an input row
- val getOutputRow = UnsafeProjection.create(
- description.dataColumns, description.allColumns)
-
- // Returns the partition path given a partition key.
- val getPartitionStringFunc = UnsafeProjection.create(
- Seq(Concat(partitionStringExpression)),
description.partitionColumns)
-
- // Sorts the data before write, so that we only need one writer at
the same time.
- val sorter = new UnsafeKVExternalSorter(
- sortingKeySchema,
- StructType.fromAttributes(description.dataColumns),
- SparkEnv.get.blockManager,
- SparkEnv.get.serializerManager,
- TaskContext.get().taskMemoryManager().pageSizeBytes,
-
SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
- UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))
+ // Generates the partition path given the row generated by
`getPartitionColsAndBucketId`.
+ val getPartPath = UnsafeProjection.create(
+ Seq(Concat(partitionPathExpression)), desc.partitionColumns)
- while (iter.hasNext) {
- val currentRow = iter.next()
- sorter.insertKV(getSortingKey(currentRow),
getOutputRow(currentRow))
- }
-
- val getBucketingKey: InternalRow => InternalRow = if
(sortColumns.isEmpty) {
- identity
- } else {
-
UnsafeProjection.create(sortingExpressions.dropRight(sortColumns.length).zipWithIndex.map
{
- case (expr, ordinal) => BoundReference(ordinal, expr.dataType,
expr.nullable)
- })
- }
-
- val sortedIterator = sorter.sortedIterator()
+ // Returns the data columns to be written given an input row
+ val getOutputRow = UnsafeProjection.create(desc.dataColumns,
desc.allColumns)
// If anything below fails, we should abort the task.
var recordsInFile: Long = 0L
var fileCounter = 0
- var currentKey: UnsafeRow = null
+ var currentPartColsAndBucketId: UnsafeRow = null
val updatedPartitions = mutable.Set[String]()
- while (sortedIterator.next()) {
- val nextKey =
getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow]
- if (currentKey != nextKey) {
- // See a new key - write to a new partition (new file).
- currentKey = nextKey.copy()
- logDebug(s"Writing partition: $currentKey")
+ for (row <- iter) {
+ val nextPartColsAndBucketId = getPartitionColsAndBucketId(row)
--- End diff --
if you take a look at the `GenerateUnsafeProject`, actually it will reuse
the same row instance, so we need to copy.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]