Github user liyezhang556520 commented on a diff in the pull request:
https://github.com/apache/spark/pull/791#discussion_r17463714
--- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
@@ -230,17 +236,27 @@ private class MemoryStore(blockManager: BlockManager,
maxMemory: Long)
val selectedBlocks = new ArrayBuffer[BlockId]()
var selectedMemory = 0L
- // This is synchronized to ensure that the set of entries is not
changed
- // (because of getValue or getBytes) while traversing the iterator,
as that
- // can lead to exceptions.
- entries.synchronized {
- val iterator = entries.entrySet().iterator()
- while (maxMemory - (currentMemory - selectedMemory) < space &&
iterator.hasNext) {
- val pair = iterator.next()
- val blockId = pair.getKey
- if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
- selectedBlocks += blockId
- selectedMemory += pair.getValue.size
+ // This lock ensures that the selection and marking for the
to-be-dropped blocks
+ // is done by only one thread at a time. Otherwise if one thread has
selected some
+ // blocks and going to mark them as dropping, another thread may
select the same blocks
+ // and mark them twice, which leads to incorrect calculation of free
space.
+ selectLock.synchronized {
+
+ // This is synchronized to ensure that the set of entries is not
changed
+ // (because of getValue or getBytes) while traversing the
iterator, as that
+ // can lead to exceptions.
+ entries.synchronized {
+ val iterator = entries.entrySet().iterator()
+ while (maxMemory - (currentMemory - selectedMemory) < space &&
iterator.hasNext) {
+ val pair = iterator.next()
+ val entry = pair.getValue
+ if (!entry.dropping) {
+ val blockId = pair.getKey
+ if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
+ selectedBlocks += blockId
+ selectedMemory += entry.size
+ }
+ }
}
}
}
--- End diff --
@cloud-fan @andrewor14
Hi cloud-fan, I think there will be some problem when you doesn't update
the `currentMemory`. Assume there are two threads, the first one get the
`selectLock` and finished running and release the lock, till now the
`currentMemory` is not updated, then the second thread get the `selectLock`,
the value of `currentMemory` for the second thread is the same with the first
thread, so, the `freeMemory=maxMemory-currentMemory` is use for two times by
the two threads. which means the `selectedMemory` for the second thread is
smaller than it actually required.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]