Github user massie commented on the pull request:
https://github.com/apache/spark/pull/6423#issuecomment-110166232
@squito I appreciate the feedback. No need to apologize for asking for an
explanation of why this PR is important.
This work is only indirectly related to Parquet (since I'm working on a
Parquet Shuffle Manager).
Currently in Spark, we have a `BlockStoreShuffleFetcher.fetch()` method
which allows callers to iterate over records in shuffle blocks. It assumes that
data is stored and retrieved using a Spark `Serializer` (which is passed as a
parameter). This unnecessarily limits Spark ShuffleManager implementations in
how they can store and retrieve data since not all data is record-oriented and
stream-based which is what the Spark `Serializer` interface assumes (e.g.
Parquet is column-oriented and `File` based).
With this update, the `BlockStoreShuffleFetcher.fetch()` method is replaced
with a `BlockStoreShuffleFetcher.fetchBlockStreams()` method with returns a
`Iterator[(BlockId, InputStream)]`. This allows the `ShuffleReader` to access
the raw `InputStream` directly. You can see that the serialization code has now
been moved in the
[HashShuffleReader](https://github.com/massie/spark/commit/19135f298e215ae11f4c8fd3b8c51147fd8bcc46#diff-8a4767e031dd2a90c7f50e82ae23a441R40).
This cleaner separation of concerns allows `ShuffleManager` developers more
flexibility in storing and retrieving shuffle records. As an example, you can
see my current work on a
[ParquetShuffleReader](https://github.com/massie/spark/blob/parquet-shuffle-backup/core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleReader.scala#L45)
which has the following...
```scala
val blockStreams = BlockStoreShuffleFetcher.fetchBlockStreams(
handle.shuffleId, startPartition, context)
val recordIterator = blockStreams.flatMap { case (blockId, inputStream)
=>
val (tempBlockId, tempBlock) =
blockManager.diskBlockManager.createTempLocalBlock()
Files.copy(inputStream, tempBlock.toPath)
inputStream.close()
val reader = new AvroParquetReader[AvroProduct2[AnyRef, AnyRef]](new
Path(tempBlock.getCanonicalPath))
val iterator = Iterator.continually(reader.read()).takeWhile(_ !=
null)//.map(
CompletionIterator[AvroProduct2[AnyRef, AnyRef],
Iterator[AvroProduct2[AnyRef, AnyRef]]](iterator, {
reader.close()
})
}
```
...which calls `fetchBlockStream`, copies the data to temporary local
blocks and then reads the records from the Parquet blocks using an
`AvroParquetReader`. This `ParquestShuffleReader` would not be possible using
the existing `Serializer`-based `BlockStoreShuffleFetcher`.
Please let me know if there's anything about this explanation that isn't
clear. It's a subtle but important change: moving the serialization code out of
the `BlockStoreShuffleFetcher` and into the `ShuffleReader` implementations.
The `BlockStoreShuffleFetcher` fetches blocks now -- not records.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]