Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/6990#discussion_r33969985
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
---
@@ -833,8 +833,10 @@ private[spark] class BlockManager(
logDebug("Put block %s locally took %s".format(blockId,
Utils.getUsedTimeMs(startTimeMs)))
// Either we're storing bytes and we asynchronously started
replication, or we're storing
- // values and need to serialize and replicate them now:
- if (putLevel.replication > 1) {
+ // values and need to serialize and replicate them now.
+ // Should not replicate the block if its StorageLevel is
StorageLevel.NONE or
+ // putting it to local is failed.
+ if (!putBlockInfo.isFailed && putLevel.replication > 1) {
--- End diff --
I will let @andrewor14 chime in on this, to figure out what is best
solution here for Spark Core - change block manager, or not. Accordingly
this PR should change block manager, or change ReceivedBlockHandler.
On Sat, Jul 4, 2015 at 12:48 AM, Dibyendu Bhattacharya <
[email protected]> wrote:
> In core/src/main/scala/org/apache/spark/storage/BlockManager.scala
> <https://github.com/apache/spark/pull/6990#discussion_r33885646>:
>
> > @@ -833,8 +833,10 @@ private[spark] class BlockManager(
> > logDebug("Put block %s locally took %s".format(blockId,
Utils.getUsedTimeMs(startTimeMs)))
> >
> > // Either we're storing bytes and we asynchronously started
replication, or we're storing
> > - // values and need to serialize and replicate them now:
> > - if (putLevel.replication > 1) {
> > + // values and need to serialize and replicate them now.
> > + // Should not replicate the block if its StorageLevel is
StorageLevel.NONE or
> > + // putting it to local is failed.
> > + if (!putBlockInfo.isFailed && putLevel.replication > 1) {
>
> hi @tdas <https://github.com/tdas> , the implementation of BlockManager
> put* (putArray, putIterator) methods exactly does the same. it calls the
> MemoryStore unrollSafely , and if block is unrolled safely , then only it
> cache the block . What this PR does is , if block not unrolled to memory,
> and storage level is not using Disk , it stops the block to replicate ,
and
> at the same time the updatedBlock result will not contain the blockId
which
> leads to ReceivedBlockHandler throw the exception. As I said this fix wont
> impact the CacheManager flow , as in CacheManager it explicitly does the
> same flow ( check for unroll safely first and then cache the block).
>
> Not sure why you think this is not a strong use case, as I see for high
> volume Receivers ( say consuming from large partitions of Kafka topic )
> receiving from say 50+ receivers from hundreds of partitions , will
> unnecessarily trigger the Replication process even if block can not stored
> in local memory and those block will never going to be used in Streaming
> jobs. This will leads to Receivers which running on Remote peer will also
> to face memory issue as lot of unnecessary blocks clogging its memory..So
I
> see this leads to chaining event which impact every Receivers ..
>
> â
> Reply to this email directly or view it on GitHub
> <https://github.com/apache/spark/pull/6990/files#r33885646>.
>
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]