Github user massie commented on the pull request:

    https://github.com/apache/spark/pull/6423#issuecomment-110166232
  
    @squito I appreciate the feedback. No need to apologize for asking for an 
explanation of why this PR is important.
    
    This work is only indirectly related to Parquet (since I'm working on a 
Parquet Shuffle Manager).
    
    Currently in Spark, we have a `BlockStoreShuffleFetcher.fetch()` method 
which allows callers to iterate over records in shuffle blocks. It assumes that 
data is stored and retrieved using a Spark `Serializer` (which is passed as a 
parameter). This unnecessarily limits Spark ShuffleManager implementations in 
how they can store and retrieve data since not all data is record-oriented and 
stream-based which is what the Spark `Serializer` interface assumes (e.g. 
Parquet is column-oriented and `File` based).
    
    With this update, the `BlockStoreShuffleFetcher.fetch()` method is replaced 
with a `BlockStoreShuffleFetcher.fetchBlockStreams()` method with returns a 
`Iterator[(BlockId, InputStream)]`. This allows the `ShuffleReader` to access 
the raw `InputStream` directly. You can see that the serialization code has now 
been moved in the 
[HashShuffleReader](https://github.com/massie/spark/commit/19135f298e215ae11f4c8fd3b8c51147fd8bcc46#diff-8a4767e031dd2a90c7f50e82ae23a441R40).
    
    This cleaner separation of concerns allows `ShuffleManager` developers more 
flexibility in storing and retrieving shuffle records. As an example, you can 
see my current work on a 
[ParquetShuffleReader](https://github.com/massie/spark/blob/parquet-shuffle-backup/core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleReader.scala#L45)
 which has the following...
    
    ```scala
    val blockStreams = BlockStoreShuffleFetcher.fetchBlockStreams(
          handle.shuffleId, startPartition, context)
    
        val recordIterator = blockStreams.flatMap { case (blockId, inputStream) 
=>
          val (tempBlockId, tempBlock) = 
blockManager.diskBlockManager.createTempLocalBlock()
          Files.copy(inputStream, tempBlock.toPath)
          inputStream.close()
          val reader = new AvroParquetReader[AvroProduct2[AnyRef, AnyRef]](new 
Path(tempBlock.getCanonicalPath))
          val iterator = Iterator.continually(reader.read()).takeWhile(_ != 
null)//.map(
          CompletionIterator[AvroProduct2[AnyRef, AnyRef], 
Iterator[AvroProduct2[AnyRef, AnyRef]]](iterator, {
              reader.close()
          })
        }
    ```
    
    ...which calls `fetchBlockStream`, copies the data to temporary local 
blocks and then reads the records from the Parquet blocks using an 
`AvroParquetReader`. This `ParquestShuffleReader` would not be possible using 
the existing `Serializer`-based `BlockStoreShuffleFetcher`.
    
    Please let me know if there's anything about this explanation that isn't 
clear. It's a subtle but important change: moving the serialization code out of 
the `BlockStoreShuffleFetcher` and into the `ShuffleReader` implementations. 
The `BlockStoreShuffleFetcher` fetches blocks now -- not records.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to