Github user kiszk commented on a diff in the pull request:
https://github.com/apache/spark/pull/22399#discussion_r216962247
--- Diff:
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java ---
@@ -181,42 +181,43 @@ private void writeSortedFile(boolean isLastFile) {
// around this, we pass a dummy no-op serializer.
final SerializerInstance ser = DummySerializerInstance.INSTANCE;
- final DiskBlockObjectWriter writer =
- blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes,
writeMetricsToUse);
-
int currentPartition = -1;
- final int uaoSize = UnsafeAlignedOffset.getUaoSize();
- while (sortedRecords.hasNext()) {
- sortedRecords.loadNext();
- final int partition =
sortedRecords.packedRecordPointer.getPartitionId();
- assert (partition >= currentPartition);
- if (partition != currentPartition) {
- // Switch to the new partition
- if (currentPartition != -1) {
- final FileSegment fileSegment = writer.commitAndGet();
- spillInfo.partitionLengths[currentPartition] =
fileSegment.length();
+ final FileSegment committedSegment;
+ try (final DiskBlockObjectWriter writer =
+ blockManager.getDiskWriter(blockId, file, ser,
fileBufferSizeBytes, writeMetricsToUse)) {
+
+ final int uaoSize = UnsafeAlignedOffset.getUaoSize();
+ while (sortedRecords.hasNext()) {
+ sortedRecords.loadNext();
+ final int partition =
sortedRecords.packedRecordPointer.getPartitionId();
+ assert (partition >= currentPartition);
+ if (partition != currentPartition) {
+ // Switch to the new partition
+ if (currentPartition != -1) {
+ final FileSegment fileSegment = writer.commitAndGet();
+ spillInfo.partitionLengths[currentPartition] =
fileSegment.length();
+ }
+ currentPartition = partition;
}
- currentPartition = partition;
- }
- final long recordPointer =
sortedRecords.packedRecordPointer.getRecordPointer();
- final Object recordPage = taskMemoryManager.getPage(recordPointer);
- final long recordOffsetInPage =
taskMemoryManager.getOffsetInPage(recordPointer);
- int dataRemaining = UnsafeAlignedOffset.getSize(recordPage,
recordOffsetInPage);
- long recordReadPosition = recordOffsetInPage + uaoSize; // skip over
record length
- while (dataRemaining > 0) {
- final int toTransfer = Math.min(diskWriteBufferSize,
dataRemaining);
- Platform.copyMemory(
- recordPage, recordReadPosition, writeBuffer,
Platform.BYTE_ARRAY_OFFSET, toTransfer);
- writer.write(writeBuffer, 0, toTransfer);
- recordReadPosition += toTransfer;
- dataRemaining -= toTransfer;
+ final long recordPointer =
sortedRecords.packedRecordPointer.getRecordPointer();
+ final Object recordPage = taskMemoryManager.getPage(recordPointer);
+ final long recordOffsetInPage =
taskMemoryManager.getOffsetInPage(recordPointer);
+ int dataRemaining = UnsafeAlignedOffset.getSize(recordPage,
recordOffsetInPage);
+ long recordReadPosition = recordOffsetInPage + uaoSize; // skip
over record length
+ while (dataRemaining > 0) {
+ final int toTransfer = Math.min(diskWriteBufferSize,
dataRemaining);
+ Platform.copyMemory(
+ recordPage, recordReadPosition, writeBuffer,
Platform.BYTE_ARRAY_OFFSET, toTransfer);
--- End diff --
nit: fix indentation
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]