Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/11534#discussion_r55411711
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -17,125 +17,100 @@
package org.apache.spark.storage
-import java.io.{File, FileOutputStream, IOException, RandomAccessFile}
+import java.io.{FileOutputStream, IOException, RandomAccessFile}
import java.nio.ByteBuffer
import java.nio.channels.FileChannel.MapMode
-import org.apache.spark.Logging
+import com.google.common.io.Closeables
+
+import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.util.Utils
/**
* Stores BlockManager blocks on disk.
*/
-private[spark] class DiskStore(blockManager: BlockManager, diskManager:
DiskBlockManager)
- extends BlockStore(blockManager) with Logging {
+private[spark] class DiskStore(conf: SparkConf, diskManager:
DiskBlockManager) extends Logging {
- val minMemoryMapBytes =
blockManager.conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
+ private val minMemoryMapBytes =
conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
- override def getSize(blockId: BlockId): Long = {
+ def getSize(blockId: BlockId): Long = {
diskManager.getFile(blockId.name).length
}
- override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level:
StorageLevel): PutResult = {
- // So that we do not modify the input offsets !
- // duplicate does not copy buffer, so inexpensive
- val bytes = _bytes.duplicate()
+ /**
+ * Invokes the provided callback function to write the specific block.
+ *
+ * @throws IllegalStateException if the block already exists in the disk
store.
+ */
+ def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
+ if (contains(blockId)) {
+ throw new IllegalStateException(s"Block $blockId is already present
in the disk store")
+ }
logDebug(s"Attempting to put block $blockId")
val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId)
- val channel = new FileOutputStream(file).getChannel
- Utils.tryWithSafeFinally {
- while (bytes.remaining > 0) {
- channel.write(bytes)
+ val fileOutputStream = new FileOutputStream(file)
+ var threwException: Boolean = true
+ try {
+ writeFunc(fileOutputStream)
+ threwException = false
+ } finally {
+ try {
+ Closeables.close(fileOutputStream, threwException)
+ } finally {
+ if (threwException) {
+ remove(blockId)
+ }
}
- } {
- channel.close()
}
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
- file.getName, Utils.bytesToString(bytes.limit), finishTime -
startTime))
- PutResult(bytes.limit(), Right(bytes.duplicate()))
+ file.getName,
+ Utils.bytesToString(file.length()),
+ finishTime - startTime))
}
- override def putIterator(
- blockId: BlockId,
- values: Iterator[Any],
- level: StorageLevel,
- returnValues: Boolean): PutResult = {
-
- logDebug(s"Attempting to write values for block $blockId")
- val startTime = System.currentTimeMillis
- val file = diskManager.getFile(blockId)
- val outputStream = new FileOutputStream(file)
- try {
+ def putBytes(blockId: BlockId, _bytes: ByteBuffer): Unit = {
+ // So that we do not modify the input offsets !
+ // duplicate does not copy buffer, so inexpensive
+ val bytes = _bytes.duplicate()
+ put(blockId) { fileOutputStream =>
+ val channel = fileOutputStream.getChannel
Utils.tryWithSafeFinally {
- blockManager.dataSerializeStream(blockId, outputStream, values)
+ while (bytes.remaining > 0) {
+ channel.write(bytes)
+ }
} {
- // Close outputStream here because it should be closed before file
is deleted.
- outputStream.close()
+ channel.close()
}
- } catch {
- case e: Throwable =>
- if (file.exists()) {
- if (!file.delete()) {
- logWarning(s"Error deleting ${file}")
- }
- }
- throw e
- }
-
- val length = file.length
-
- val timeTaken = System.currentTimeMillis - startTime
- logDebug("Block %s stored as %s file on disk in %d ms".format(
- file.getName, Utils.bytesToString(length), timeTaken))
-
- if (returnValues) {
- // Return a byte buffer for the contents of the file
- val buffer = getBytes(blockId).get
- PutResult(length, Right(buffer))
- } else {
- PutResult(length, null)
}
}
- private def getBytes(file: File, offset: Long, length: Long):
Option[ByteBuffer] = {
+ def getBytes(blockId: BlockId): ByteBuffer = {
+ val file = diskManager.getFile(blockId.name)
val channel = new RandomAccessFile(file, "r").getChannel
Utils.tryWithSafeFinally {
// For small files, directly read rather than memory map
- if (length < minMemoryMapBytes) {
- val buf = ByteBuffer.allocate(length.toInt)
- channel.position(offset)
+ if (file.length < minMemoryMapBytes) {
+ val buf = ByteBuffer.allocate(file.length.toInt)
+ channel.position(0)
while (buf.remaining() != 0) {
if (channel.read(buf) == -1) {
throw new IOException("Reached EOF before filling buffer\n" +
-
s"offset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
+
s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
}
}
buf.flip()
- Some(buf)
+ buf
} else {
- Some(channel.map(MapMode.READ_ONLY, offset, length))
+ channel.map(MapMode.READ_ONLY, 0, file.length)
}
} {
channel.close()
}
}
- override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
- val file = diskManager.getFile(blockId.name)
- getBytes(file, 0, file.length)
- }
-
- def getBytes(segment: FileSegment): Option[ByteBuffer] = {
- getBytes(segment.file, segment.offset, segment.length)
- }
-
- override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
- getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId,
buffer))
- }
-
- override def remove(blockId: BlockId): Boolean = {
+ def remove(blockId: BlockId): Boolean = {
--- End diff --
Removals from the stores are idempotent, so the return value lets the
caller check whether the removal actually deleted a block (see usage in
`BlockManager.removeBlock()`).
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]