Github user squito commented on the issue:
https://github.com/apache/spark/pull/21346
All good questions and stuff I had wondered about too -- I should actually
be sure to comment on these on the jira as well:
> I recall that the problem with large shuffle blocks was that the
OneForOneBlockFetcher strategy basically read the entire block as a single
chunk, which becomes a problem for large blocks. I understand that we have now
removed this limitation for shuffles by using a streaming transfer strategy
only for large blocks (above some threshold). Is this patch conceptually doing
the same thing for push-based communication where the action is initiated by a
sender (e.g. to push a block for replication)?
yes
> Does it also affect pull-based remote cache block reads or will that be
handled separately?
that was already handled by
https://issues.apache.org/jira/browse/SPARK-22062 (despite the title saying its
something else entirely). That said, I recently discovered that my tests doing
this for large blocks was incorrect, so I need to reconfirm this (I need to
rearrange my test a little, and I've got a different aspect of this in flight
so will take a couple of days probably).
> Given that we already seem to have pull-based openStream() calls which
can be initiated from the receive side, could we simplify things here by
pushing a "this value is big, pull it" message and then have the remote end
initiate a streaming read, similar to how DirectTaskResult and
IndirectTaskResult work?
its certainly possible to do this, and I started taking this approach, but
I stopped because [replication is
synchronous](https://github.com/apache/spark/blob/bfd75cdfb22a8c2fb005da597621e1ccd3990e82/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1344).
So you'd have to add a callback for when the block is finally fetched, to go
back to this initial call -- but also add timeout logic to avoid waiting
forever if the destination went away. It all seemed much more complicated than
doing it the way I'm proposing here.
> For remote reads of large cached blocks: is it true that this works today
only if the block is on disk but fails if the block is in memory? If certain
size limit problems only occur when things are cached in memory, can we
simplify anything if we add a requirement that blocks above 2GB can only be
cached on disk (regardless of storage level)?
Correct; I'm currently investigating what we can do to address this.
(sorry, again I discovered my test was broken shortly after posting this.) It
would certainly simplify things if we only supported this for disk cached
blocks -- what exactly are you proposing? Just failing when its cached in
memory, and telling the user to rerun with disk caching? Changing the block
manager to automatically cache on disk _also_ when the block is > 2gb? Or when
sending the block, just write it to a temp file, and then send from that?
The problem here is on the sending side, not the receiving side; netty uses
an [`int` to manage the length of a `ByteBuf` based
msg](https://github.com/netty/netty/blob/netty-4.1.17.Final/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java#L178-L180),
but it uses a [`long` for a `FileRegion` based
msg](https://github.com/netty/netty/blob/netty-4.1.17.Final/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java#L213-L224)
(code is a little different in the latest on branch 4.1, but same problem is
still there). I'm investigating making a "FileRegion" that is actually backed
by a `ChunkedByteBuffer`.
But that would go into another jira under SPARK-6235
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]