Github user davies commented on a diff in the pull request:
https://github.com/apache/spark/pull/7514#discussion_r35993986
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala
---
@@ -244,34 +244,65 @@ private[sql] case class InsertIntoHadoopFsRelation(
}
def writeRows(taskContext: TaskContext, iterator:
Iterator[InternalRow]): Unit = {
+ // Track which rows have been output to disk so that if a data sort
is necessary mid-write,
+ // we don't end up outputting the same data twice
+ val writtenRows: HashSet[InternalRow] = new HashSet[InternalRow]
+
+ // Flag to track whether data has been sorted in which case it's
safe to close previously
+ // used outputWriters
+ var sorted: Boolean = false
+
// If anything below fails, we should abort the task.
try {
writerContainer.executorSideSetup(taskContext)
- // Projects all partition columns and casts them to strings to
build partition directories.
- val partitionCasts = partitionOutput.map(Cast(_, StringType))
- val partitionProj = newProjection(codegenEnabled, partitionCasts,
output)
- val dataProj = newProjection(codegenEnabled, dataOutput, output)
+ // Sort the data by partition so that it's possible to use a
single outputWriter at a
+ // time to process the incoming data
+ def sortRows(iterator: Iterator[InternalRow]):
Iterator[InternalRow] = {
+ // Sort by the same key used to look up the outputWriter to
allow us to recyle the writer
+
iterator.toArray.sortBy(writerContainer.computePartitionPath).toIterator
--- End diff --
There could be many records, can not be hold in memory, so we should use
external sort.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]