Github user squito commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    All good questions and stuff I had wondered about too -- I should actually 
be sure to comment on these on the jira as well:
    
    > I recall that the problem with large shuffle blocks was that the 
OneForOneBlockFetcher strategy basically read the entire block as a single 
chunk, which becomes a problem for large blocks. I understand that we have now 
removed this limitation for shuffles by using a streaming transfer strategy 
only for large blocks (above some threshold). Is this patch conceptually doing 
the same thing for push-based communication where the action is initiated by a 
sender (e.g. to push a block for replication)? 
    
    yes
    
    > Does it also affect pull-based remote cache block reads or will that be 
handled separately?
    
    that was already handled by 
https://issues.apache.org/jira/browse/SPARK-22062 (despite the title saying its 
something else entirely).  That said, I recently discovered that my tests doing 
this for large blocks was incorrect, so I need to reconfirm this (I need to 
rearrange my test a little, and I've got a different aspect of this in flight 
so will take a couple of days probably).
    
    > Given that we already seem to have pull-based openStream() calls which 
can be initiated from the receive side, could we simplify things here by 
pushing a "this value is big, pull it" message and then have the remote end 
initiate a streaming read, similar to how DirectTaskResult and 
IndirectTaskResult work?
    
    its certainly possible to do this, and I started taking this approach, but 
I stopped because [replication is 
synchronous](https://github.com/apache/spark/blob/bfd75cdfb22a8c2fb005da597621e1ccd3990e82/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1344).
  So you'd have to add a callback for when the block is finally fetched, to go 
back to this initial call -- but also add timeout logic to avoid waiting 
forever if the destination went away.  It all seemed much more complicated than 
doing it the way I'm proposing here.
    
    > For remote reads of large cached blocks: is it true that this works today 
only if the block is on disk but fails if the block is in memory? If certain 
size limit problems only occur when things are cached in memory, can we 
simplify anything if we add a requirement that blocks above 2GB can only be 
cached on disk (regardless of storage level)?
    
    Correct; I'm currently investigating what we can do to address this.  
(sorry, again I discovered my test was broken shortly after posting this.)  It 
would certainly simplify things if we only supported this for disk cached 
blocks -- what exactly are you proposing?  Just failing when its cached in 
memory, and telling the user to rerun with disk caching?  Changing the block 
manager to automatically cache on disk _also_ when the block is > 2gb?  Or when 
sending the block, just write it to a temp file, and then send from that?
    
    The problem here is on the sending side, not the receiving side; netty uses 
an [`int` to manage the length of a  `ByteBuf` based 
msg](https://github.com/netty/netty/blob/netty-4.1.17.Final/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java#L178-L180),
 but it uses a [`long` for a `FileRegion` based 
msg](https://github.com/netty/netty/blob/netty-4.1.17.Final/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java#L213-L224)
  (code is a little different in the latest on branch 4.1, but same problem is 
still there).  I'm investigating making a "FileRegion" that is actually backed 
by a `ChunkedByteBuffer`.
    
    But that would go into another jira under SPARK-6235


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to