Github user srinathshankar commented on a diff in the pull request:
https://github.com/apache/spark/pull/15043#discussion_r79061246
--- Diff:
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -794,13 +821,17 @@ private[storage] class PartiallySerializedBlock[T](
* `close()` on it to free its resources.
*/
def valuesIterator: PartiallyUnrolledIterator[T] = {
+ verifyNotConsumedAndNotDiscarded()
+ // Close the serialization stream so that the serializer's internal
buffers are freed and any
+ // "end-of-stream" markers can be written out so that `unrolled` is a
valid serialized stream.
+ serializationStream.close()
// `unrolled`'s underlying buffers will be freed once this input
stream is fully read:
val unrolledIter = serializerManager.dataDeserializeStream(
- blockId, unrolled.toInputStream(dispose = true))(classTag)
+ blockId, unrolledBuffer.toInputStream(dispose = true))(classTag)
new PartiallyUnrolledIterator(
memoryStore,
unrollMemory,
- unrolled = CompletionIterator[T, Iterator[T]](unrolledIter,
discard()),
+ unrolled = CompletionIterator[T, Iterator[T]](unrolledIter,
unrolledBuffer.dispose()),
--- End diff --
Why the change from discard to dispose() ? You've made discard idempotent,
right ? Does the caller have to manually release memory after the iterator is
consumed ?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]