Github user dafrista commented on a diff in the pull request:
https://github.com/apache/spark/pull/13382#discussion_r65287494
--- Diff:
core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala ---
@@ -46,66 +46,82 @@ private[spark] class DiskBlockObjectWriter(
extends OutputStream
with Logging {
+ /**
+ * Guards against close calls, e.g. from a wrapping stream.
+ * Call manualClose to close the stream that was extended by this trait.
+ */
+ private trait ManualCloseOutputStream extends OutputStream {
+ abstract override def close(): Unit = {
+ flush()
+ }
+
+ def manualClose(): Unit = {
+ super.close()
+ }
+ }
+
/** The file channel, used for repositioning / truncating the file. */
private var channel: FileChannel = null
+ private var mcs: ManualCloseOutputStream = null
private var bs: OutputStream = null
private var fos: FileOutputStream = null
private var ts: TimeTrackingOutputStream = null
private var objOut: SerializationStream = null
private var initialized = false
+ private var streamOpen = false
private var hasBeenClosed = false
- private var commitAndCloseHasBeenCalled = false
/**
* Cursors used to represent positions in the file.
*
- * xxxxxxxx|--------|--- |
- * ^ ^ ^
- * | | finalPosition
- * | reportedPosition
- * initialPosition
+ * xxxxxxxx|--------|---|
+ * ^ ^
+ * | committedPosition
+ * reportedPosition
*
- * initialPosition: Offset in the file where we start writing. Immutable.
* reportedPosition: Position at the time of the last update to the
write metrics.
- * finalPosition: Offset where we stopped writing. Set on
closeAndCommit() then never changed.
+ * committedPosition: Offset after last committed write.
* -----: Current writes to the underlying file.
* xxxxx: Existing contents of the file.
*/
- private val initialPosition = file.length()
- private var finalPosition: Long = -1
- private var reportedPosition = initialPosition
+ private var committedPosition = file.length()
+ private var reportedPosition = committedPosition
/**
* Keep track of number of records written and also use this to
periodically
* output bytes written since the latter is expensive to do for each
record.
*/
private var numRecordsWritten = 0
+ private def initialize(): Unit = {
+ fos = new FileOutputStream(file, true)
+ channel = fos.getChannel()
+ ts = new TimeTrackingOutputStream(writeMetrics, fos)
+ class ManualCloseBufferedOutputStream
+ extends BufferedOutputStream(ts, bufferSize) with
ManualCloseOutputStream
+ mcs = new ManualCloseBufferedOutputStream
+ }
+
def open(): DiskBlockObjectWriter = {
if (hasBeenClosed) {
throw new IllegalStateException("Writer already closed. Cannot be
reopened.")
}
- fos = new FileOutputStream(file, true)
- ts = new TimeTrackingOutputStream(writeMetrics, fos)
- channel = fos.getChannel()
- bs = compressStream(new BufferedOutputStream(ts, bufferSize))
+ if (!initialized) {
+ initialize()
+ initialized = true
+ }
+ bs = compressStream(mcs)
objOut = serializerInstance.serializeStream(bs)
- initialized = true
+ streamOpen = true
this
}
override def close() {
if (initialized) {
Utils.tryWithSafeFinally {
- if (syncWrites) {
- // Force outstanding writes to disk and track how long it takes
- objOut.flush()
- val start = System.nanoTime()
- fos.getFD.sync()
- writeMetrics.incWriteTime(System.nanoTime() - start)
- }
+ commit()
--- End diff --
This is interesting. Making `close()` a no-op breaks my understanding of
`OutputStream`, where I expect (a) any buffered data to be written out and (b)
cleaning up resources. But I don't see anywhere that definitely prescribes this
either.
Apart from that, right now having `revertPartialWritesAndClose()` call
`close()` may be causing more confusion -- on this code path, the `commit()`
called on `close()` ends up having no effect because `streamOpen` is set to
false, but it's not so clear. How about introducing a private
`closeResources()` that cleans up only the resources, and have `close()` call
`commit()`, then `closeResources()`; and have `revertPartialWritesAndClose()`
call `closeResources()` instead of `close()`?
If the previous paragraph doesn't make much sense, I can push a commit with
this change.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]