Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/15982#discussion_r90121766
--- Diff:
core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java ---
@@ -337,42 +340,47 @@ void forceSorterToSpill() throws IOException {
final int numPartitions = partitioner.numPartitions();
final long[] partitionLengths = new long[numPartitions];
final InputStream[] spillInputStreams = new
FileInputStream[spills.length];
- OutputStream mergedFileOutputStream = null;
+
+ // Use a counting output stream to avoid having to close the
underlying file and ask
+ // the file system for its size after each partition is written.
+ final CountingOutputStream mergedFileOutputStream = new
CountingOutputStream(
+ new FileOutputStream(outputFile));
boolean threwException = true;
try {
for (int i = 0; i < spills.length; i++) {
spillInputStreams[i] = new FileInputStream(spills[i].file);
}
for (int partition = 0; partition < numPartitions; partition++) {
- final long initialFileLength = outputFile.length();
- mergedFileOutputStream =
- new TimeTrackingOutputStream(writeMetrics, new
FileOutputStream(outputFile, true));
+ final long initialFileLength =
mergedFileOutputStream.getByteCount();
+ // Shield the underlying output stream from close() calls, so that
we can close the higher
+ // level streams to make sure all data is really flushed and
internal state is cleaned.
+ OutputStream partitionOutput = new
CloseShieldOutputStream(mergedFileOutputStream);
+ partitionOutput =
blockManager.serializerManager().wrapForEncryption(partitionOutput);
if (compressionCodec != null) {
- mergedFileOutputStream =
compressionCodec.compressedOutputStream(mergedFileOutputStream);
+ partitionOutput =
compressionCodec.compressedOutputStream(partitionOutput);
}
-
+ partitionOutput = new TimeTrackingOutputStream(writeMetrics,
partitionOutput);
--- End diff --
another change here is that `TimeTrackingOutputStream` now goes around the
compression codec. I think that is the right change, but its at least worth
mentioning in the commit msg.
I'm wondering if this its worth having a separate jira for this, just since
it will effect metrics for all users
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]