c21 commented on a change in pull request #32198:
URL: https://github.com/apache/spark/pull/32198#discussion_r618177561
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
##########
@@ -255,25 +331,184 @@ class DynamicPartitionDataWriter(
}
if (isBucketed) {
currentBucketId = nextBucketId
- statsTrackers.foreach(_.newBucket(currentBucketId.get))
}
fileCounter = 0
- newOutputWriter(currentPartitionValues, currentBucketId)
+ newOutputWriter(currentPartitionValues, currentBucketId, true)
} else if (description.maxRecordsPerFile > 0 &&
recordsInFile >= description.maxRecordsPerFile) {
- // Exceeded the threshold in terms of the number of records per file.
- // Create a new file by increasing the file counter.
- fileCounter += 1
- assert(fileCounter < MAX_FILE_COUNTER,
- s"File counter $fileCounter is beyond max value $MAX_FILE_COUNTER")
+ increaseFileCounter(currentPartitionValues, currentBucketId)
+ }
+ writeRecord(record)
+ }
+}
+
+/**
+ * Dynamic partition writer with concurrent writers, meaning multiple
concurrent writers are opened
+ * for writing.
+ *
+ * The process has the following steps:
+ * - Step 1: Maintain a map of output writers per each partition and/or
bucket columns. Keep all
+ * writers opened and write rows one by one.
+ * - Step 2: If number of concurrent writers exceeds limit, sort rest of rows
on partition and/or
+ * bucket column(s). Write rows one by one, and eagerly close the
writer when finishing
+ * each partition and/or bucket.
+ *
+ * Caller is expected to call `writeWithIterator()` instead of `write()` to
write records.
+ */
+class DynamicPartitionDataConcurrentWriter(
+ description: WriteJobDescription,
+ taskAttemptContext: TaskAttemptContext,
+ committer: FileCommitProtocol,
+ concurrentOutputWriterSpec: ConcurrentOutputWriterSpec)
+ extends BaseDynamicPartitionDataWriter(description, taskAttemptContext,
committer) {
+
+ /** Wrapper class to index a unique concurrent output writer. */
+ private case class WriterIndex(
+ var partitionValues: Option[UnsafeRow],
+ var bucketId: Option[Int])
+
+ /** Wrapper class for status of a unique concurrent output writer. */
+ private class WriterStatus(
+ var outputWriter: OutputWriter,
+ var recordsInFile: Long,
+ var fileCounter: Int)
+
+ /**
+ * State to indicate if we are falling back to sort-based writer.
+ * Because we first try to use concurrent writers, its initial value is
false.
+ */
+ private var sortBased: Boolean = false
+ private val concurrentWriters = mutable.HashMap[WriterIndex, WriterStatus]()
+ private val currentWriterId = WriterIndex(None, None)
Review comment:
> Can we always create a new WriterIndex instance when needed and make
WriterIndex immutable?
I am worried about that we may need to create a new `WriterIndex` a lot as
data layout has no order and it may switch between writers per each row a lot,
and in the worst case we need to create a new `WriterIndex` per row to lookup
in the map, and has GC issue.
If we are worried about correctness, is test in
`DataFrameReaderWriterSuite.scala` releasing your concern?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]