[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2016-05-10 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278147#comment-15278147
 ] 

Apache Spark commented on SPARK-4452:
-

User 'lianhuiwang' has created a pull request for this issue:
https://github.com/apache/spark/pull/13027

> Shuffle data structures can starve others on the same thread for memory 
> 
>
> Key: SPARK-4452
> URL: https://issues.apache.org/jira/browse/SPARK-4452
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.1.0
>Reporter: Tianshuo Deng
>Assignee: Lianhui Wang
> Fix For: 2.0.0
>
>
> When an Aggregator is used with ExternalSorter in a task, spark will create 
> many small files and could cause too many files open error during merging.
> Currently, ShuffleMemoryManager does not work well when there are 2 spillable 
> objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
> by Aggregator) in this case. Here is an example: Due to the usage of mapside 
> aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
> ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
> on when ExternalSorter is created in the same thread, the 
> ShuffleMemoryManager could refuse to allocate more memory to it, since the 
> memory is already given to the previous requested 
> object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
> small files(due to the lack of memory)
> I'm currently working on a PR to address these two issues. It will include 
> following changes:
> 1. The ShuffleMemoryManager should not only track the memory usage for each 
> thread, but also the object who holds the memory
> 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
> spillable object. In this way, if a new object in a thread is requesting 
> memory, the old occupant could be evicted/spilled. Previously the spillable 
> objects trigger spilling by themselves. So one may not trigger spilling even 
> if another object in the same thread needs more memory. After this change The 
> ShuffleMemoryManager could trigger the spilling of an object if it needs to.
> 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
> ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
> after the iterator is returned. This should be changed so that even after the 
> iterator is returned, the ShuffleMemoryManager can still spill it.
> Currently, I have a working branch in progress: 
> https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made 
> change 3 and have a prototype of change 1 and 2 to evict spillable from 
> memory manager, still in progress. I will send a PR when it's done.
> Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2016-05-09 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15277583#comment-15277583
 ] 

Apache Spark commented on SPARK-4452:
-

User 'lianhuiwang' has created a pull request for this issue:
https://github.com/apache/spark/pull/13020

> Shuffle data structures can starve others on the same thread for memory 
> 
>
> Key: SPARK-4452
> URL: https://issues.apache.org/jira/browse/SPARK-4452
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.1.0
>Reporter: Tianshuo Deng
>Assignee: Lianhui Wang
> Fix For: 2.0.0
>
>
> When an Aggregator is used with ExternalSorter in a task, spark will create 
> many small files and could cause too many files open error during merging.
> Currently, ShuffleMemoryManager does not work well when there are 2 spillable 
> objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
> by Aggregator) in this case. Here is an example: Due to the usage of mapside 
> aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
> ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
> on when ExternalSorter is created in the same thread, the 
> ShuffleMemoryManager could refuse to allocate more memory to it, since the 
> memory is already given to the previous requested 
> object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
> small files(due to the lack of memory)
> I'm currently working on a PR to address these two issues. It will include 
> following changes:
> 1. The ShuffleMemoryManager should not only track the memory usage for each 
> thread, but also the object who holds the memory
> 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
> spillable object. In this way, if a new object in a thread is requesting 
> memory, the old occupant could be evicted/spilled. Previously the spillable 
> objects trigger spilling by themselves. So one may not trigger spilling even 
> if another object in the same thread needs more memory. After this change The 
> ShuffleMemoryManager could trigger the spilling of an object if it needs to.
> 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
> ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
> after the iterator is returned. This should be changed so that even after the 
> iterator is returned, the ShuffleMemoryManager can still spill it.
> Currently, I have a working branch in progress: 
> https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made 
> change 3 and have a prototype of change 1 and 2 to evict spillable from 
> memory manager, still in progress. I will send a PR when it's done.
> Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2016-05-09 Thread Xin Hao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15276109#comment-15276109
 ] 

Xin Hao commented on SPARK-4452:


Since this is an old issue which impact Spark since 1.1.0, can the patch be 
merged to Spark 1.6.X ? Thanks.

> Shuffle data structures can starve others on the same thread for memory 
> 
>
> Key: SPARK-4452
> URL: https://issues.apache.org/jira/browse/SPARK-4452
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.1.0
>Reporter: Tianshuo Deng
>Assignee: Lianhui Wang
> Fix For: 2.0.0
>
>
> When an Aggregator is used with ExternalSorter in a task, spark will create 
> many small files and could cause too many files open error during merging.
> Currently, ShuffleMemoryManager does not work well when there are 2 spillable 
> objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
> by Aggregator) in this case. Here is an example: Due to the usage of mapside 
> aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
> ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
> on when ExternalSorter is created in the same thread, the 
> ShuffleMemoryManager could refuse to allocate more memory to it, since the 
> memory is already given to the previous requested 
> object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
> small files(due to the lack of memory)
> I'm currently working on a PR to address these two issues. It will include 
> following changes:
> 1. The ShuffleMemoryManager should not only track the memory usage for each 
> thread, but also the object who holds the memory
> 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
> spillable object. In this way, if a new object in a thread is requesting 
> memory, the old occupant could be evicted/spilled. Previously the spillable 
> objects trigger spilling by themselves. So one may not trigger spilling even 
> if another object in the same thread needs more memory. After this change The 
> ShuffleMemoryManager could trigger the spilling of an object if it needs to.
> 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
> ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
> after the iterator is returned. This should be changed so that even after the 
> iterator is returned, the ShuffleMemoryManager can still spill it.
> Currently, I have a working branch in progress: 
> https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made 
> change 3 and have a prototype of change 1 and 2 to evict spillable from 
> memory manager, still in progress. I will send a PR when it's done.
> Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2016-04-25 Thread Davies Liu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15256602#comment-15256602
 ] 

Davies Liu commented on SPARK-4452:
---

We only backport critical bug fix into released branch.

There is no 1.7.0, 2.0 will released around June 2016.

> Shuffle data structures can starve others on the same thread for memory 
> 
>
> Key: SPARK-4452
> URL: https://issues.apache.org/jira/browse/SPARK-4452
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.1.0
>Reporter: Tianshuo Deng
>Assignee: Lianhui Wang
> Fix For: 2.0.0
>
>
> When an Aggregator is used with ExternalSorter in a task, spark will create 
> many small files and could cause too many files open error during merging.
> Currently, ShuffleMemoryManager does not work well when there are 2 spillable 
> objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
> by Aggregator) in this case. Here is an example: Due to the usage of mapside 
> aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
> ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
> on when ExternalSorter is created in the same thread, the 
> ShuffleMemoryManager could refuse to allocate more memory to it, since the 
> memory is already given to the previous requested 
> object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
> small files(due to the lack of memory)
> I'm currently working on a PR to address these two issues. It will include 
> following changes:
> 1. The ShuffleMemoryManager should not only track the memory usage for each 
> thread, but also the object who holds the memory
> 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
> spillable object. In this way, if a new object in a thread is requesting 
> memory, the old occupant could be evicted/spilled. Previously the spillable 
> objects trigger spilling by themselves. So one may not trigger spilling even 
> if another object in the same thread needs more memory. After this change The 
> ShuffleMemoryManager could trigger the spilling of an object if it needs to.
> 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
> ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
> after the iterator is returned. This should be changed so that even after the 
> iterator is returned, the ShuffleMemoryManager can still spill it.
> Currently, I have a working branch in progress: 
> https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made 
> change 3 and have a prototype of change 1 and 2 to evict spillable from 
> memory manager, still in progress. I will send a PR when it's done.
> Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2016-04-24 Thread Romi Kuntsman (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15255658#comment-15255658
 ] 

Romi Kuntsman commented on SPARK-4452:
--

Hi, what's the reason this will only be available in Spark 2.0.0, and not 1.6.4 
or 1.7.0?

> Shuffle data structures can starve others on the same thread for memory 
> 
>
> Key: SPARK-4452
> URL: https://issues.apache.org/jira/browse/SPARK-4452
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.1.0
>Reporter: Tianshuo Deng
>Assignee: Tianshuo Deng
> Fix For: 2.0.0
>
>
> When an Aggregator is used with ExternalSorter in a task, spark will create 
> many small files and could cause too many files open error during merging.
> Currently, ShuffleMemoryManager does not work well when there are 2 spillable 
> objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
> by Aggregator) in this case. Here is an example: Due to the usage of mapside 
> aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
> ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
> on when ExternalSorter is created in the same thread, the 
> ShuffleMemoryManager could refuse to allocate more memory to it, since the 
> memory is already given to the previous requested 
> object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
> small files(due to the lack of memory)
> I'm currently working on a PR to address these two issues. It will include 
> following changes:
> 1. The ShuffleMemoryManager should not only track the memory usage for each 
> thread, but also the object who holds the memory
> 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
> spillable object. In this way, if a new object in a thread is requesting 
> memory, the old occupant could be evicted/spilled. Previously the spillable 
> objects trigger spilling by themselves. So one may not trigger spilling even 
> if another object in the same thread needs more memory. After this change The 
> ShuffleMemoryManager could trigger the spilling of an object if it needs to.
> 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
> ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
> after the iterator is returned. This should be changed so that even after the 
> iterator is returned, the ShuffleMemoryManager can still spill it.
> Currently, I have a working branch in progress: 
> https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made 
> change 3 and have a prototype of change 1 and 2 to evict spillable from 
> memory manager, still in progress. I will send a PR when it's done.
> Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2015-11-28 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15030450#comment-15030450
 ] 

Apache Spark commented on SPARK-4452:
-

User 'lianhuiwang' has created a pull request for this issue:
https://github.com/apache/spark/pull/10024

> Shuffle data structures can starve others on the same thread for memory 
> 
>
> Key: SPARK-4452
> URL: https://issues.apache.org/jira/browse/SPARK-4452
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.1.0
>Reporter: Tianshuo Deng
>Assignee: Tianshuo Deng
>
> When an Aggregator is used with ExternalSorter in a task, spark will create 
> many small files and could cause too many files open error during merging.
> Currently, ShuffleMemoryManager does not work well when there are 2 spillable 
> objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
> by Aggregator) in this case. Here is an example: Due to the usage of mapside 
> aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
> ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
> on when ExternalSorter is created in the same thread, the 
> ShuffleMemoryManager could refuse to allocate more memory to it, since the 
> memory is already given to the previous requested 
> object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
> small files(due to the lack of memory)
> I'm currently working on a PR to address these two issues. It will include 
> following changes:
> 1. The ShuffleMemoryManager should not only track the memory usage for each 
> thread, but also the object who holds the memory
> 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
> spillable object. In this way, if a new object in a thread is requesting 
> memory, the old occupant could be evicted/spilled. Previously the spillable 
> objects trigger spilling by themselves. So one may not trigger spilling even 
> if another object in the same thread needs more memory. After this change The 
> ShuffleMemoryManager could trigger the spilling of an object if it needs to.
> 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
> ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
> after the iterator is returned. This should be changed so that even after the 
> iterator is returned, the ShuffleMemoryManager can still spill it.
> Currently, I have a working branch in progress: 
> https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made 
> change 3 and have a prototype of change 1 and 2 to evict spillable from 
> memory manager, still in progress. I will send a PR when it's done.
> Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2015-06-30 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14608618#comment-14608618
 ] 

Apache Spark commented on SPARK-4452:
-

User 'lianhuiwang' has created a pull request for this issue:
https://github.com/apache/spark/pull/7130

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Tianshuo Deng
Assignee: Tianshuo Deng

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 Currently, ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes:
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. Previously the spillable 
 objects trigger spilling by themselves. So one may not trigger spilling even 
 if another object in the same thread needs more memory. After this change The 
 ShuffleMemoryManager could trigger the spilling of an object if it needs to.
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made 
 change 3 and have a prototype of change 1 and 2 to evict spillable from 
 memory manager, still in progress. I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2015-06-30 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14609144#comment-14609144
 ] 

Josh Rosen commented on SPARK-4452:
---

I've linked this to the Project Tungsten JIRA epic, since the increased uses of 
spillable collections in the Tunsgten code will magnify this issue.

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Tianshuo Deng
Assignee: Tianshuo Deng

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 Currently, ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes:
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. Previously the spillable 
 objects trigger spilling by themselves. So one may not trigger spilling even 
 if another object in the same thread needs more memory. After this change The 
 ShuffleMemoryManager could trigger the spilling of an object if it needs to.
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made 
 change 3 and have a prototype of change 1 and 2 to evict spillable from 
 memory manager, still in progress. I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2015-01-24 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14290874#comment-14290874
 ] 

Sean Owen commented on SPARK-4452:
--

Can this JIRA be resolved now that its children are resolved, or is the more to 
this one?

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Tianshuo Deng
Assignee: Tianshuo Deng
Priority: Critical

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 Currently, ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes:
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. Previously the spillable 
 objects trigger spilling by themselves. So one may not trigger spilling even 
 if another object in the same thread needs more memory. After this change The 
 ShuffleMemoryManager could trigger the spilling of an object if it needs to.
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made 
 change 3 and have a prototype of change 1 and 2 to evict spillable from 
 memory manager, still in progress. I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2015-01-24 Thread Sandy Ryza (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14290885#comment-14290885
 ] 

Sandy Ryza commented on SPARK-4452:
---

I think there's more to this one, the subtasks solved the most egregious 
issues, but shuffle data structures can still hog memory in detrimental ways 
described in some of the comments above.

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Tianshuo Deng
Assignee: Tianshuo Deng
Priority: Critical

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 Currently, ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes:
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. Previously the spillable 
 objects trigger spilling by themselves. So one may not trigger spilling even 
 if another object in the same thread needs more memory. After this change The 
 ShuffleMemoryManager could trigger the spilling of an object if it needs to.
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made 
 change 3 and have a prototype of change 1 and 2 to evict spillable from 
 memory manager, still in progress. I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2014-11-26 Thread Sandy Ryza (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14226969#comment-14226969
 ] 

Sandy Ryza commented on SPARK-4452:
---

Thinking about the current change a little more, an issue is that it will spill 
all the in-memory data to disk in situations where this is probably overkill.  
E.g. consider the typical situation of shuffle data slightly exceeding memory.  
We end up spilling the entire data structure if a downstream data structure 
needs even a small amount of memory.

I think that your proposed change 2 is probably worthwhile.

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Tianshuo Deng
Assignee: Tianshuo Deng
Priority: Critical

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 Currently, ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes:
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. Previously the spillable 
 objects trigger spilling by themselves. So one may not trigger spilling even 
 if another object in the same thread needs more memory. After this change The 
 ShuffleMemoryManager could trigger the spilling of an object if it needs to.
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made 
 change 3 and have a prototype of change 1 and 2 to evict spillable from 
 memory manager, still in progress. I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2014-11-25 Thread Tianshuo Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14224996#comment-14224996
 ] 

Tianshuo Deng commented on SPARK-4452:
--

[~sandyr]:
Thanks for the feedback!

For double counting, yes, the external data structure may results to double 
counting. But it only applies to the in-memory portion of the data. In my PR, 
in ExternalOnlyMap, once the in-memory portion is spilled, the memory is 
recycled(by giving an empty iterator and empty map).

 So there are two approaches I can do
1. Minor change based on my current change: also recycle the memory when memory 
iterator is drained
2. A little bigger change: Make the memory iterator destructive by nulling out 
the underlying element in the array when the element is returned, this also 
requires spillable data structure to report back the memory occupied when the 
iterator is being consumed, while currently it only reports the memory usage 
when new data is being inserted.

So change 1 seems adding less constraints to the spillable data structure, what 
do you think?



 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Tianshuo Deng
Assignee: Tianshuo Deng
Priority: Critical

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 Currently, ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes:
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. Previously the spillable 
 objects trigger spilling by themselves. So one may not trigger spilling even 
 if another object in the same thread needs more memory. After this change The 
 ShuffleMemoryManager could trigger the spilling of an object if it needs to.
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made 
 change 3 and have a prototype of change 1 and 2 to evict spillable from 
 memory manager, still in progress. I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2014-11-23 Thread Sandy Ryza (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14222572#comment-14222572
 ] 

Sandy Ryza commented on SPARK-4452:
---

[~tianshuo], I took a look at the patch, and the general approach looks 
reasonable to me.

A couple additional thoughts that apply both to the current approach and 
Tianshuo's patch:
* When we chain an ExternalAppendOnlyMap to an ExternalSorter for processing 
combined map outputs in sort based shuffle, we end up double counting, no? Both 
data structures will be holding references to the same objects and estimating 
their size based on these objects.
* We could make the in-memory iterators destructive as well right?  I.e. if the 
data structures can release references to objects as they yield them, then we 
can give memory back to the shuffle memory manager and make it available to 
other data structures in the same thread.

If we can avoid double and holding on to unneeded objects, it would obviate 
some of the need for intra-thread limits / forced spilling.

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Tianshuo Deng
Assignee: Tianshuo Deng
Priority: Critical

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 Currently, ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes:
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. Previously the spillable 
 objects trigger spilling by themselves. So one may not trigger spilling even 
 if another object in the same thread needs more memory. After this change The 
 ShuffleMemoryManager could trigger the spilling of an object if it needs to.
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made 
 change 3 and have a prototype of change 1 and 2 to evict spillable from 
 memory manager, still in progress. I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2014-11-20 Thread Tianshuo Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14219795#comment-14219795
 ] 

Tianshuo Deng commented on SPARK-4452:
--

Hi, [~matei]:
Hi, Matei

My way of implementing it is more like the 2nd way you suggested. I will put up 
a design doc. But I would like to give a preview of my implementation first

 I already implemented following and seems work for me

1. Memory Allocation and spilling is divided into two levels. 
SpillableTaskMemoryManager for memory allocation and spilling of current 
thread/task. ShuffleMemoryManager coordinates memory allocation among 
threads/tasks

2. SpillableTaskMemoryManager: objects are grouped by threads, each STMM maps 
to one thread/task. If an object requires more memory, it asks STMM for it. 
STMM will ask ShuffleMemoryManager for more memory for current thread. if the 
returned memory does not satisfy the request, it will tries to spill objs in 
current thread to give up memory. Notice the objects it may spill are 
thread-local, so there is no contention

3. ShuffleMemoryManager: The algorithm in thread memory allocation is basically 
unchanged. Only thing is that spillables do not ask SMM directly for more 
memory, instead STMM asks for memory for the thread.

By making this change, spilling is triggered from STMM. This design has 
following properties in mind:

- Incremental change, thread memory allocation algorithm is unchanged. This way 
each task/thread get a fair share of memory.
- Spiling is thread local and is triggered from STMM to avoid unnecessary 
locking and contention. 
- Two levels of memory allocation makes a distinction between allocating memory 
for tasks and allocating memory/spilling objs in the current task. This 
distinction makes contention management more clear and easier

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Tianshuo Deng
Assignee: Tianshuo Deng
Priority: Critical

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 Currently, ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes:
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. Previously the spillable 
 objects trigger spilling by themselves. So one may not trigger spilling even 
 if another object in the same thread needs more memory. After this change The 
 ShuffleMemoryManager could trigger the spilling of an object if it needs to.
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made 
 change 3 and have a prototype of change 1 and 2 to evict spillable from 
 memory manager, still in progress. I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2014-11-20 Thread Tianshuo Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14220070#comment-14220070
 ] 

Tianshuo Deng commented on SPARK-4452:
--

Here is a link of the diff:
https://github.com/tsdeng/spark/compare/fix_memory_starvation?expand=1
Notice there are tons of logInfo that I will remove later. And haven't add 
detailed comment yet.

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Tianshuo Deng
Assignee: Tianshuo Deng
Priority: Critical

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 Currently, ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes:
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. Previously the spillable 
 objects trigger spilling by themselves. So one may not trigger spilling even 
 if another object in the same thread needs more memory. After this change The 
 ShuffleMemoryManager could trigger the spilling of an object if it needs to.
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made 
 change 3 and have a prototype of change 1 and 2 to evict spillable from 
 memory manager, still in progress. I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2014-11-19 Thread Tianshuo Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14218289#comment-14218289
 ] 

Tianshuo Deng commented on SPARK-4452:
--

Hi,
While I'm working on this ticket, I have an related question:
I noticed an extra constraint in the usage of ExternalAppendOnlyMap. 
Even in the current implementation(master), If an ExternalAppendOnlyMap 
exported a iterator(spilled), you can not get the iterator again, since the 
memory iterator is destructive.
But in our unit tests, the constraint seems to be ignored... many tests are 
calling iterator multiple times. It works because the data is small and does 
not trigger the spilling in unit test.

But I just want to confirm, if it's ok I explicitly adding this constraint to 
the code and unit test: Iterator of an ExternalAppendOnlyMap can only be 
exported once

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Tianshuo Deng
Assignee: Tianshuo Deng
Priority: Critical

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 Currently, ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes:
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. Previously the spillable 
 objects trigger spilling by themselves. So one may not trigger spilling even 
 if another object in the same thread needs more memory. After this change The 
 ShuffleMemoryManager could trigger the spilling of an object if it needs to.
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made 
 change 3 and have a prototype of change 1 and 2 to evict spillable from 
 memory manager, still in progress. I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2014-11-19 Thread Andrew Or (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14218318#comment-14218318
 ] 

Andrew Or commented on SPARK-4452:
--

[~tianshuo] That is a correct assumption for ExternalAppendOnlyMap: once it has 
spilled and we called `iterator`, which destroyed the underlying map, we should 
not be able to call `iterator` again or insert any items into the map. We 
should really document that clearly, but your understanding is correct.

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Tianshuo Deng
Assignee: Tianshuo Deng
Priority: Critical

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 Currently, ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes:
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. Previously the spillable 
 objects trigger spilling by themselves. So one may not trigger spilling even 
 if another object in the same thread needs more memory. After this change The 
 ShuffleMemoryManager could trigger the spilling of an object if it needs to.
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made 
 change 3 and have a prototype of change 1 and 2 to evict spillable from 
 memory manager, still in progress. I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2014-11-18 Thread Matei Zaharia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14216691#comment-14216691
 ] 

Matei Zaharia commented on SPARK-4452:
--

BTW I've thought about this more and here's what I'd suggest: try a version 
where each object is allowed to ramp up to a certain size (say 5 MB) before 
being subject to the limit, and if that doesn't work, then maybe go for the 
forced-spilling one. The reason is that as soon as N objects are active, the 
ShuffleMemoryManager will not let any object ramp up to more than 1/N, so it 
just has to fill up its current quota and stop. This means that scenarios with 
very little free memory might only happen at the beginning (when tasks start 
up). If we can make this work, then we avoid a lot of concurrency problems that 
would happen with forced spilling. 

Another improvement would be to make the Spillables request less than 2x their 
current memory when they ramp up, e.g. 1.5x. They'd then make more requests but 
it would lead to slower ramp-up and more of a chance for other threads to grab 
memory. But I think this will have less impact than simply increasing that free 
minimum amount.

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
Affects Versions: 1.1.0
Reporter: Tianshuo Deng
Assignee: Tianshuo Deng
Priority: Blocker

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 Currently, ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes:
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. Previously the spillable 
 objects trigger spilling by themselves. So one may not trigger spilling even 
 if another object in the same thread needs more memory. After this change The 
 ShuffleMemoryManager could trigger the spilling of an object if it needs to.
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made 
 change 3 and have a prototype of change 1 and 2 to evict spillable from 
 memory manager, still in progress. I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2014-11-18 Thread Sandy Ryza (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14216933#comment-14216933
 ] 

Sandy Ryza commented on SPARK-4452:
---

One issue with a limits-by-object approach is that it could result in extra 
wasted memory over the current approach for tasks that produce less shuffle 
data than they read.  E.g. consider a 
rdd.reduceByKey(...).map(...).reduceByKey(...)...

The object aggregating inputs used to have access to the full memory allotted 
to the task, but now it only gets half the memory.  In situations where the 
object aggregating outputs doesn't need as much memory (because there is less 
output data), some of the memory that previously would have been used is unused.

A forced spilling approach seems like it could give some of the advantages that 
preemption provides in cluster scheduling - better utilization through enabling 
objects to use more than their fair amount until it turns out other objects 
need those resources.


 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
Affects Versions: 1.1.0
Reporter: Tianshuo Deng
Assignee: Tianshuo Deng
Priority: Blocker

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 Currently, ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes:
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. Previously the spillable 
 objects trigger spilling by themselves. So one may not trigger spilling even 
 if another object in the same thread needs more memory. After this change The 
 ShuffleMemoryManager could trigger the spilling of an object if it needs to.
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made 
 change 3 and have a prototype of change 1 and 2 to evict spillable from 
 memory manager, still in progress. I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2014-11-18 Thread Andrew Or (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14217253#comment-14217253
 ] 

Andrew Or commented on SPARK-4452:
--

I have opened a JIRA that targets on fixing this on a smaller scope: 
SPARK-4480. I intend to pull this smaller fix into 1.1.1, and maybe it's 
sufficient for 1.2.0. This particular JIRA (SPARK-4452) likely involves a much 
bigger change that is too ambitious for either release at the moment.

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Tianshuo Deng
Assignee: Tianshuo Deng
Priority: Critical

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 Currently, ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes:
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. Previously the spillable 
 objects trigger spilling by themselves. So one may not trigger spilling even 
 if another object in the same thread needs more memory. After this change The 
 ShuffleMemoryManager could trigger the spilling of an object if it needs to.
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made 
 change 3 and have a prototype of change 1 and 2 to evict spillable from 
 memory manager, still in progress. I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2014-11-18 Thread Andrew Or (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14217274#comment-14217274
 ] 

Andrew Or commented on SPARK-4452:
--

[~matei] I have implemented your first suggestion here: 
https://github.com/apache/spark/pull/3353. In my particular workload, I've 
noticed at least an order of magnitude reduction in the number of shuffle files 
written. More details provided in the PR.

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Tianshuo Deng
Assignee: Tianshuo Deng
Priority: Critical

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 Currently, ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes:
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. Previously the spillable 
 objects trigger spilling by themselves. So one may not trigger spilling even 
 if another object in the same thread needs more memory. After this change The 
 ShuffleMemoryManager could trigger the spilling of an object if it needs to.
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made 
 change 3 and have a prototype of change 1 and 2 to evict spillable from 
 memory manager, still in progress. I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2014-11-18 Thread Matei Zaharia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14217331#comment-14217331
 ] 

Matei Zaharia commented on SPARK-4452:
--

Forced spilling is orthogonal to how you set the limits actually. For example, 
if there are N objects, one way to set limits is to reserve at least 1/N of 
memory for each one. But another way would be to group them by thread, and use 
a different algorithm for allocation within a thread (e.g. set each object's 
cap to more if other objects in their thread are using less). Whether you force 
spilling or not, you'll have to decide what the right limit for each thing is.

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Tianshuo Deng
Assignee: Tianshuo Deng
Priority: Critical

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 Currently, ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes:
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. Previously the spillable 
 objects trigger spilling by themselves. So one may not trigger spilling even 
 if another object in the same thread needs more memory. After this change The 
 ShuffleMemoryManager could trigger the spilling of an object if it needs to.
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made 
 change 3 and have a prototype of change 1 and 2 to evict spillable from 
 memory manager, still in progress. I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2014-11-18 Thread Sandy Ryza (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14217340#comment-14217340
 ] 

Sandy Ryza commented on SPARK-4452:
---

[~matei] my point is not that forced spilling allows us to avoid setting 
limits, but that it allows those limits to be soft: if an entity (thread or 
object) is not requesting the 1/N memory reserved for it, that memory can be 
given to other entities that need it.  Then, if the entity later requests the 
memory reserved to it, the other entities above their fair allocation can be 
forced to spill.

(I don't necessarily mean to argue that this advantage is worth the added 
complexity.)

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Tianshuo Deng
Assignee: Tianshuo Deng
Priority: Critical

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 Currently, ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes:
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. Previously the spillable 
 objects trigger spilling by themselves. So one may not trigger spilling even 
 if another object in the same thread needs more memory. After this change The 
 ShuffleMemoryManager could trigger the spilling of an object if it needs to.
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made 
 change 3 and have a prototype of change 1 and 2 to evict spillable from 
 memory manager, still in progress. I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2014-11-17 Thread Sandy Ryza (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14215269#comment-14215269
 ] 

Sandy Ryza commented on SPARK-4452:
---

Updated the title to reflect the specific problem.

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
Affects Versions: 1.1.0
Reporter: tianshuo

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 This happens when using the sort-based shuffle. The issue is caused by 
 multiple factors:
 1. There seems to be a bug in setting the elementsRead variable in 
 ExternalSorter, which renders the trackMemoryThreshold(defined in Spillable) 
 useless for triggering spilling, the pr to fix it is 
 https://github.com/apache/spark/pull/3302
 2. Current ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. This avoids problem 2 from 
 happening. Previously spillable object triggers spilling by themself. So one 
 may not trigger spilling even if another object in the same thread needs more 
 memory. After this change The ShuffleMemoryManager could trigger the spilling 
 of an object if it needs to
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager 
 Already made change 3 and have a prototype of change 1 and 2 to evict 
 spillable from memory manager, still in progress.
 I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2014-11-17 Thread Andrew Or (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14215395#comment-14215395
 ] 

Andrew Or commented on SPARK-4452:
--

Hey [~tianshuo] do you see this issue only for sort-based shuffle? Have you 
been able to reproduce it on hash-based shuffle?

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
Affects Versions: 1.1.0
Reporter: Tianshuo Deng

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 This happens when using the sort-based shuffle. The issue is caused by 
 multiple factors:
 1. There seems to be a bug in setting the elementsRead variable in 
 ExternalSorter, which renders the trackMemoryThreshold(defined in Spillable) 
 useless for triggering spilling, the pr to fix it is 
 https://github.com/apache/spark/pull/3302
 2. Current ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. This avoids problem 2 from 
 happening. Previously spillable object triggers spilling by themself. So one 
 may not trigger spilling even if another object in the same thread needs more 
 memory. After this change The ShuffleMemoryManager could trigger the spilling 
 of an object if it needs to
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager 
 Already made change 3 and have a prototype of change 1 and 2 to evict 
 spillable from memory manager, still in progress.
 I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2014-11-17 Thread Tianshuo Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14215411#comment-14215411
 ] 

Tianshuo Deng commented on SPARK-4452:
--

Hi, [~andrewor14]:
Actually hash-based shuffle does not go as bad as sort-based shuffle on this 
particular problem. We were able to bypass this problem by using hash-based 
shuffle. This problem was so bad for me also because the elementsRead bug, so 
that could be also another reason why hash-based shuffle didn't break as badly.

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
Affects Versions: 1.1.0
Reporter: Tianshuo Deng

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 This happens when using the sort-based shuffle. The issue is caused by 
 multiple factors:
 1. There seems to be a bug in setting the elementsRead variable in 
 ExternalSorter, which renders the trackMemoryThreshold(defined in Spillable) 
 useless for triggering spilling, the pr to fix it is 
 https://github.com/apache/spark/pull/3302
 2. Current ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. This avoids problem 2 from 
 happening. Previously spillable object triggers spilling by themself. So one 
 may not trigger spilling even if another object in the same thread needs more 
 memory. After this change The ShuffleMemoryManager could trigger the spilling 
 of an object if it needs to
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager 
 Already made change 3 and have a prototype of change 1 and 2 to evict 
 spillable from memory manager, still in progress.
 I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2014-11-17 Thread Tianshuo Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14215418#comment-14215418
 ] 

Tianshuo Deng commented on SPARK-4452:
--

Hi, [~andrewor14]:
The elementsRead bug that makes the situation so bad and went to too many 
files open is fixed here: https://github.com/apache/spark/pull/3302.
I will send another PR for the memory starving problem mentioned in this ticket 
soon.

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
Affects Versions: 1.1.0
Reporter: Tianshuo Deng

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 This happens when using the sort-based shuffle. The issue is caused by 
 multiple factors:
 1. There seems to be a bug in setting the elementsRead variable in 
 ExternalSorter, which renders the trackMemoryThreshold(defined in Spillable) 
 useless for triggering spilling, the pr to fix it is 
 https://github.com/apache/spark/pull/3302
 2. Current ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. This avoids problem 2 from 
 happening. Previously spillable object triggers spilling by themself. So one 
 may not trigger spilling even if another object in the same thread needs more 
 memory. After this change The ShuffleMemoryManager could trigger the spilling 
 of an object if it needs to
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager 
 Already made change 3 and have a prototype of change 1 and 2 to evict 
 spillable from memory manager, still in progress.
 I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2014-11-17 Thread Matei Zaharia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14215425#comment-14215425
 ] 

Matei Zaharia commented on SPARK-4452:
--

How much of this gets fixed if you fix the elementsRead bug in ExternalSorter?

With forcing data structures to spill, the problem is that it will introduce 
complexity in every spillable data structure. I wonder if we can make it just 
give out memory in smaller increments, so that threads check whether they 
should spill more often. In addition, we can set a better minimum or maximum on 
each thread (e.g. always let it ramp up to, say, 5 MB, or some fraction of the 
memory space).

I do like the idea of making the ShuffleMemoryManager track limits per object. 
I actually considered this when I wrote that and didn't do it, possibly because 
it would've created more complexity in figuring out when an object is done. But 
it seems like it should be straightforward to add in, as long as you also track 
which objects come from which thread so that you can still 
releaseMemoryForThisThread() to clean up.

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
Affects Versions: 1.1.0
Reporter: Tianshuo Deng

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 This happens when using the sort-based shuffle. The issue is caused by 
 multiple factors:
 1. There seems to be a bug in setting the elementsRead variable in 
 ExternalSorter, which renders the trackMemoryThreshold(defined in Spillable) 
 useless for triggering spilling, the pr to fix it is 
 https://github.com/apache/spark/pull/3302
 2. Current ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. This avoids problem 2 from 
 happening. Previously spillable object triggers spilling by themself. So one 
 may not trigger spilling even if another object in the same thread needs more 
 memory. After this change The ShuffleMemoryManager could trigger the spilling 
 of an object if it needs to
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager 
 Already made change 3 and have a prototype of change 1 and 2 to evict 
 spillable from memory manager, still in progress.
 I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2014-11-17 Thread Andrew Or (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14215427#comment-14215427
 ] 

Andrew Or commented on SPARK-4452:
--

I see, in other words, there are two separate issues affecting sort-based 
shuffle:

1. The `elementsRead` variable is not updated
2. External data structures starve each other if they're in the same thread

where (2) is also common in hash-based shuffle. Your PR 
https://github.com/apache/spark/pull/3302 fixes (1), but we still need to 
address (2) at some point. However, fixing (1) is important enough because we 
previously just unconditionally spilled every 32 records after a while.

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
Affects Versions: 1.1.0
Reporter: Tianshuo Deng

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 This happens when using the sort-based shuffle. The issue is caused by 
 multiple factors:
 1. There seems to be a bug in setting the elementsRead variable in 
 ExternalSorter, which renders the trackMemoryThreshold(defined in Spillable) 
 useless for triggering spilling, the pr to fix it is 
 https://github.com/apache/spark/pull/3302
 2. Current ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. This avoids problem 2 from 
 happening. Previously spillable object triggers spilling by themself. So one 
 may not trigger spilling even if another object in the same thread needs more 
 memory. After this change The ShuffleMemoryManager could trigger the spilling 
 of an object if it needs to
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager 
 Already made change 3 and have a prototype of change 1 and 2 to evict 
 spillable from memory manager, still in progress.
 I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2014-11-17 Thread Tianshuo Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14215434#comment-14215434
 ] 

Tianshuo Deng commented on SPARK-4452:
--

Hi, [~andrewor14],
Yeah exactly. Actually this ticket is more for addressing the (2) problem, I 
have a branch in progress for that: 
https://github.com/tsdeng/spark/tree/enhance_memory_manager

It's still a prototype, but greatly alleviate the problem for us. Just trying 
to finalize that.

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
Affects Versions: 1.1.0
Reporter: Tianshuo Deng

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 This happens when using the sort-based shuffle. The issue is caused by 
 multiple factors:
 1. There seems to be a bug in setting the elementsRead variable in 
 ExternalSorter, which renders the trackMemoryThreshold(defined in Spillable) 
 useless for triggering spilling, the pr to fix it is 
 https://github.com/apache/spark/pull/3302
 2. Current ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. This avoids problem 2 from 
 happening. Previously spillable object triggers spilling by themself. So one 
 may not trigger spilling even if another object in the same thread needs more 
 memory. After this change The ShuffleMemoryManager could trigger the spilling 
 of an object if it needs to
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager 
 Already made change 3 and have a prototype of change 1 and 2 to evict 
 spillable from memory manager, still in progress.
 I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2014-11-17 Thread Sandy Ryza (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14215436#comment-14215436
 ] 

Sandy Ryza commented on SPARK-4452:
---

[~andrewor14], IIUC, (2) shouldn't happen in hash-based shuffle at all, because 
hash-based shuffle doesn't use multiple spillable data structures in each task.

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
Affects Versions: 1.1.0
Reporter: Tianshuo Deng

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 This happens when using the sort-based shuffle. The issue is caused by 
 multiple factors:
 1. There seems to be a bug in setting the elementsRead variable in 
 ExternalSorter, which renders the trackMemoryThreshold(defined in Spillable) 
 useless for triggering spilling, the pr to fix it is 
 https://github.com/apache/spark/pull/3302
 2. Current ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. This avoids problem 2 from 
 happening. Previously spillable object triggers spilling by themself. So one 
 may not trigger spilling even if another object in the same thread needs more 
 memory. After this change The ShuffleMemoryManager could trigger the spilling 
 of an object if it needs to
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager 
 Already made change 3 and have a prototype of change 1 and 2 to evict 
 spillable from memory manager, still in progress.
 I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2014-11-17 Thread Andrew Or (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14215446#comment-14215446
 ] 

Andrew Or commented on SPARK-4452:
--

[~sandyr] hash-based shuffle can still use two ExternalAppendOnlyMaps in 1 task 
if you have back-to-back shuffles where the second shuffle does a map-side 
combine.

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
Affects Versions: 1.1.0
Reporter: Tianshuo Deng
Priority: Blocker

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 This happens when using the sort-based shuffle. The issue is caused by 
 multiple factors:
 1. There seems to be a bug in setting the elementsRead variable in 
 ExternalSorter, which renders the trackMemoryThreshold(defined in Spillable) 
 useless for triggering spilling, the pr to fix it is 
 https://github.com/apache/spark/pull/3302
 2. Current ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. This avoids problem 2 from 
 happening. Previously spillable object triggers spilling by themself. So one 
 may not trigger spilling even if another object in the same thread needs more 
 memory. After this change The ShuffleMemoryManager could trigger the spilling 
 of an object if it needs to
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager 
 Already made change 3 and have a prototype of change 1 and 2 to evict 
 spillable from memory manager, still in progress.
 I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2014-11-17 Thread Sandy Ryza (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14215448#comment-14215448
 ] 

Sandy Ryza commented on SPARK-4452:
---

Ah, true.

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
Affects Versions: 1.1.0
Reporter: Tianshuo Deng
Priority: Blocker

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 This happens when using the sort-based shuffle. The issue is caused by 
 multiple factors:
 1. There seems to be a bug in setting the elementsRead variable in 
 ExternalSorter, which renders the trackMemoryThreshold(defined in Spillable) 
 useless for triggering spilling, the pr to fix it is 
 https://github.com/apache/spark/pull/3302
 2. Current ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. This avoids problem 2 from 
 happening. Previously spillable object triggers spilling by themself. So one 
 may not trigger spilling even if another object in the same thread needs more 
 memory. After this change The ShuffleMemoryManager could trigger the spilling 
 of an object if it needs to
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager 
 Already made change 3 and have a prototype of change 1 and 2 to evict 
 spillable from memory manager, still in progress.
 I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2014-11-17 Thread Tianshuo Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14215449#comment-14215449
 ] 

Tianshuo Deng commented on SPARK-4452:
--

[~matei]:
You are right, it does add more complexity if we force the data structure to 
spill. But in my prototype branch I already made changes to ExternalSorter and 
ExternalAppendOnlyMap to make it support that. And it's not too hard and 
doable. 
In terms of coding, it does add complexity, but the property we get from it is 
pretty nice: able to spill the object as we want to.
Also ExternalSorter and ExternalAppendOnlyMap are the only two that need to be 
changed.

For your question, after fixing the elementsRead bug, we do not see the 
exception, but could still see tons of small files due to the memory starvation.


 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
Affects Versions: 1.1.0
Reporter: Tianshuo Deng
Priority: Blocker

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 This happens when using the sort-based shuffle. The issue is caused by 
 multiple factors:
 1. There seems to be a bug in setting the elementsRead variable in 
 ExternalSorter, which renders the trackMemoryThreshold(defined in Spillable) 
 useless for triggering spilling, the pr to fix it is 
 https://github.com/apache/spark/pull/3302
 2. Current ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. This avoids problem 2 from 
 happening. Previously spillable object triggers spilling by themself. So one 
 may not trigger spilling even if another object in the same thread needs more 
 memory. After this change The ShuffleMemoryManager could trigger the spilling 
 of an object if it needs to
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager 
 Already made change 3 and have a prototype of change 1 and 2 to evict 
 spillable from memory manager, still in progress.
 I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2014-11-17 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14215491#comment-14215491
 ] 

Apache Spark commented on SPARK-4452:
-

User 'andrewor14' has created a pull request for this issue:
https://github.com/apache/spark/pull/3330

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
Affects Versions: 1.1.0
Reporter: Tianshuo Deng
Priority: Blocker

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 This happens when using the sort-based shuffle. The issue is caused by 
 multiple factors:
 1. There seems to be a bug in setting the elementsRead variable in 
 ExternalSorter, which renders the trackMemoryThreshold(defined in Spillable) 
 useless for triggering spilling, the pr to fix it is 
 https://github.com/apache/spark/pull/3302
 2. Current ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. This avoids problem 2 from 
 happening. Previously spillable object triggers spilling by themself. So one 
 may not trigger spilling even if another object in the same thread needs more 
 memory. After this change The ShuffleMemoryManager could trigger the spilling 
 of an object if it needs to
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager 
 Already made change 3 and have a prototype of change 1 and 2 to evict 
 spillable from memory manager, still in progress.
 I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2014-11-17 Thread Matei Zaharia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14215557#comment-14215557
 ] 

Matei Zaharia commented on SPARK-4452:
--

BTW we may also want to create a separate JIRA for the short-term fix for 1.1 
and 1.2.

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
Affects Versions: 1.1.0
Reporter: Tianshuo Deng
Assignee: Tianshuo Deng
Priority: Blocker

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 This happens when using the sort-based shuffle. The issue is caused by 
 multiple factors:
 1. There seems to be a bug in setting the elementsRead variable in 
 ExternalSorter, which renders the trackMemoryThreshold(defined in Spillable) 
 useless for triggering spilling, the pr to fix it is 
 https://github.com/apache/spark/pull/3302
 2. Current ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. This avoids problem 2 from 
 happening. Previously spillable object triggers spilling by themself. So one 
 may not trigger spilling even if another object in the same thread needs more 
 memory. After this change The ShuffleMemoryManager could trigger the spilling 
 of an object if it needs to
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager 
 Already made change 3 and have a prototype of change 1 and 2 to evict 
 spillable from memory manager, still in progress.
 I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2014-11-17 Thread Matei Zaharia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14215556#comment-14215556
 ] 

Matei Zaharia commented on SPARK-4452:
--

Got it. It would be fine to do this if you found it to help, I was just 
wondering whether simpler fixes would get us far enough. For the forced 
spilling change, I'd suggest writing a short design doc, or making sure that 
the comments in the code about it are very detailed (essentially having a 
design doc at the top of the class). This can have a lot of tricky cases due to 
concurrency so it's important to document the design.

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
Affects Versions: 1.1.0
Reporter: Tianshuo Deng
Assignee: Tianshuo Deng
Priority: Blocker

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 This happens when using the sort-based shuffle. The issue is caused by 
 multiple factors:
 1. There seems to be a bug in setting the elementsRead variable in 
 ExternalSorter, which renders the trackMemoryThreshold(defined in Spillable) 
 useless for triggering spilling, the pr to fix it is 
 https://github.com/apache/spark/pull/3302
 2. Current ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. This avoids problem 2 from 
 happening. Previously spillable object triggers spilling by themself. So one 
 may not trigger spilling even if another object in the same thread needs more 
 memory. After this change The ShuffleMemoryManager could trigger the spilling 
 of an object if it needs to
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager 
 Already made change 3 and have a prototype of change 1 and 2 to evict 
 spillable from memory manager, still in progress.
 I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory

2014-11-17 Thread Andrew Or (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14215617#comment-14215617
 ] 

Andrew Or commented on SPARK-4452:
--

I have created SPARK-4467 for the `elementsRead` bug since the bigger issue 
here is distinct.

 Shuffle data structures can starve others on the same thread for memory 
 

 Key: SPARK-4452
 URL: https://issues.apache.org/jira/browse/SPARK-4452
 Project: Spark
  Issue Type: Bug
Affects Versions: 1.1.0
Reporter: Tianshuo Deng
Assignee: Tianshuo Deng
Priority: Blocker

 When an Aggregator is used with ExternalSorter in a task, spark will create 
 many small files and could cause too many files open error during merging.
 This happens when using the sort-based shuffle. The issue is caused by 
 multiple factors:
 1. There seems to be a bug in setting the elementsRead variable in 
 ExternalSorter, which renders the trackMemoryThreshold(defined in Spillable) 
 useless for triggering spilling, the pr to fix it is 
 https://github.com/apache/spark/pull/3302
 2. Current ShuffleMemoryManager does not work well when there are 2 spillable 
 objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used 
 by Aggregator) in this case. Here is an example: Due to the usage of mapside 
 aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may 
 ask as much memory as it can, which is totalMem/numberOfThreads. Then later 
 on when ExternalSorter is created in the same thread, the 
 ShuffleMemoryManager could refuse to allocate more memory to it, since the 
 memory is already given to the previous requested 
 object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling 
 small files(due to the lack of memory)
 I'm currently working on a PR to address these two issues. It will include 
 following changes
 1. The ShuffleMemoryManager should not only track the memory usage for each 
 thread, but also the object who holds the memory
 2. The ShuffleMemoryManager should be able to trigger the spilling of a 
 spillable object. In this way, if a new object in a thread is requesting 
 memory, the old occupant could be evicted/spilled. This avoids problem 2 from 
 happening. Previously spillable object triggers spilling by themself. So one 
 may not trigger spilling even if another object in the same thread needs more 
 memory. After this change The ShuffleMemoryManager could trigger the spilling 
 of an object if it needs to
 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously 
 ExternalAppendOnlyMap returns an destructive iterator and can not be spilled 
 after the iterator is returned. This should be changed so that even after the 
 iterator is returned, the ShuffleMemoryManager can still spill it.
 Currently, I have a working branch in progress: 
 https://github.com/tsdeng/spark/tree/enhance_memory_manager 
 Already made change 3 and have a prototype of change 1 and 2 to evict 
 spillable from memory manager, still in progress.
 I will send a PR when it's done.
 Any feedback or thoughts on this change is highly appreciated !



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org