[jira] [Commented] (SPARK-3333) Large number of partitions causes OOM

2014-09-02 Thread Patrick Wendell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14118455#comment-14118455
 ] 

Patrick Wendell commented on SPARK-:


Hey [~nchammas] unfortunately we couldn't reproduce this at a smaller scale.

The reproduction here is a slightly pathological case (24,000 empty tasks) so 
it's not totally clear to me that this would affect any actual workload. Also 
by default I think Spark launches with a very small amount of driver memory, so 
it might be that there is GC happening due to an increase in the amount of 
memory required for tasks, the web UI, or other meta-data and that's why it's 
slower. It would be good to log GC data by setting 
spark.driver.extraJavaOptions in to -XX:+printGCDetails in spark-defaults.conf.

I'll make a call soon about whether to let this block the release. If we can't 
narrow it down in time, it's not worth holding a bunch of other features for 
this... we can fix issues in a patch release shortly if we find any.

 Large number of partitions causes OOM
 -

 Key: SPARK-
 URL: https://issues.apache.org/jira/browse/SPARK-
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.1.0
 Environment: EC2; 1 master; 1 slave; {{m3.xlarge}} instances
Reporter: Nicholas Chammas
Priority: Blocker

 Here’s a repro for PySpark:
 {code}
 a = sc.parallelize([Nick, John, Bob])
 a = a.repartition(24000)
 a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 {code}
 This code runs fine on 1.0.2. It returns the following result in just over a 
 minute:
 {code}
 [(4, 'NickJohn')]
 {code}
 However, when I try this with 1.1.0-rc3 on an identically-sized cluster, it 
 runs for a very, very long time (at least  45 min) and then fails with 
 {{java.lang.OutOfMemoryError: Java heap space}}.
 Here is a stack trace taken from a run on 1.1.0-rc2:
 {code}
  a = sc.parallelize([Nick, John, Bob])
  a = a.repartition(24000)
  a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent 
 heart beats: 175143ms exceeds 45000ms
 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent 
 heart beats: 175359ms exceeds 45000ms
 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent 
 heart beats: 173061ms exceeds 45000ms
 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent 
 heart beats: 176816ms exceeds 45000ms
 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent 
 heart beats: 182241ms exceeds 45000ms
 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent 
 heart beats: 178406ms exceeds 45000ms
 14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver 
 thread-3
 java.lang.OutOfMemoryError: Java heap space
 at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
 at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at 
 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
 at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
 at 
 org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
 at 
 org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
 at 
 

[jira] [Commented] (SPARK-3333) Large number of partitions causes OOM

2014-09-02 Thread Patrick Wendell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14118593#comment-14118593
 ] 

Patrick Wendell commented on SPARK-:


Hey [~nchammas] one other thing. If you go back to the original job that was 
causing this issue, can you see any regression? This benchmark is pathological 
enough that it would be good to see if there is actually an issue in Spark.

 Large number of partitions causes OOM
 -

 Key: SPARK-
 URL: https://issues.apache.org/jira/browse/SPARK-
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.1.0
 Environment: EC2; 1 master; 1 slave; {{m3.xlarge}} instances
Reporter: Nicholas Chammas
Priority: Blocker

 Here’s a repro for PySpark:
 {code}
 a = sc.parallelize([Nick, John, Bob])
 a = a.repartition(24000)
 a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 {code}
 This code runs fine on 1.0.2. It returns the following result in just over a 
 minute:
 {code}
 [(4, 'NickJohn')]
 {code}
 However, when I try this with 1.1.0-rc3 on an identically-sized cluster, it 
 runs for a very, very long time (at least  45 min) and then fails with 
 {{java.lang.OutOfMemoryError: Java heap space}}.
 Here is a stack trace taken from a run on 1.1.0-rc2:
 {code}
  a = sc.parallelize([Nick, John, Bob])
  a = a.repartition(24000)
  a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent 
 heart beats: 175143ms exceeds 45000ms
 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent 
 heart beats: 175359ms exceeds 45000ms
 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent 
 heart beats: 173061ms exceeds 45000ms
 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent 
 heart beats: 176816ms exceeds 45000ms
 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent 
 heart beats: 182241ms exceeds 45000ms
 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent 
 heart beats: 178406ms exceeds 45000ms
 14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver 
 thread-3
 java.lang.OutOfMemoryError: Java heap space
 at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
 at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at 
 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
 at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
 at 
 org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
 at 
 org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Exception in thread Result resolver thread-3 14/08/29 21:56:26 ERROR 
 SendingConnection: Exception while reading SendingConnection to 
 ConnectionManagerId(ip-10-73-142-223.ec2.internal,54014)
 java.nio.channels.ClosedChannelException
 at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
 at 

[jira] [Commented] (SPARK-3333) Large number of partitions causes OOM

2014-09-02 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14118695#comment-14118695
 ] 

Josh Rosen commented on SPARK-:
---

I was unable to reproduce this on a cluster with two *r3*.xlarge instances 
(trying on *m3* shortly) using the following script:

{code}
from pyspark import SparkContext

sc = SparkContext(appName=test)
a = sc.parallelize([Nick, John, Bob])
a = a.repartition(24000)
parallelism = sc.defaultParallelism
a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y, parallelism).take(1)
{code}

In both 1.0.2 and 1.1.0-rc3, the job ran in ~1 minute 10 seconds (with 
essentially no timing difference between the two versions).  I'm going to try 
this again on m3 instances and double-check both configurations.

 Large number of partitions causes OOM
 -

 Key: SPARK-
 URL: https://issues.apache.org/jira/browse/SPARK-
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.1.0
 Environment: EC2; 1 master; 1 slave; {{m3.xlarge}} instances
Reporter: Nicholas Chammas
Priority: Blocker

 Here’s a repro for PySpark:
 {code}
 a = sc.parallelize([Nick, John, Bob])
 a = a.repartition(24000)
 a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 {code}
 This code runs fine on 1.0.2. It returns the following result in just over a 
 minute:
 {code}
 [(4, 'NickJohn')]
 {code}
 However, when I try this with 1.1.0-rc3 on an identically-sized cluster, it 
 runs for a very, very long time (at least  45 min) and then fails with 
 {{java.lang.OutOfMemoryError: Java heap space}}.
 Here is a stack trace taken from a run on 1.1.0-rc2:
 {code}
  a = sc.parallelize([Nick, John, Bob])
  a = a.repartition(24000)
  a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent 
 heart beats: 175143ms exceeds 45000ms
 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent 
 heart beats: 175359ms exceeds 45000ms
 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent 
 heart beats: 173061ms exceeds 45000ms
 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent 
 heart beats: 176816ms exceeds 45000ms
 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent 
 heart beats: 182241ms exceeds 45000ms
 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent 
 heart beats: 178406ms exceeds 45000ms
 14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver 
 thread-3
 java.lang.OutOfMemoryError: Java heap space
 at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
 at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at 
 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
 at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
 at 
 org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
 at 
 org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at 

[jira] [Commented] (SPARK-3333) Large number of partitions causes OOM

2014-09-02 Thread Nicholas Chammas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14118735#comment-14118735
 ] 

Nicholas Chammas commented on SPARK-:
-

{quote}
If we can't narrow it down in time, it's not worth holding a bunch of other 
features for this... we can fix issues in a patch release shortly if we find 
any.
{quote}

Sounds fine to me. I'm trying to retrace how I originally stumbled on this but 
am currently tied up with other stuff. I'll report back when I can. It looks 
like the issue is limited to some specific setup which Josh seems to be slowly 
honing in on, which is great.

Just FYI Josh, if it matters, I've been running the little benchmarks reported 
here via in the shell, not using spark-submit.

 Large number of partitions causes OOM
 -

 Key: SPARK-
 URL: https://issues.apache.org/jira/browse/SPARK-
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.1.0
 Environment: EC2; 1 master; 1 slave; {{m3.xlarge}} instances
Reporter: Nicholas Chammas
Priority: Blocker

 Here’s a repro for PySpark:
 {code}
 a = sc.parallelize([Nick, John, Bob])
 a = a.repartition(24000)
 a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 {code}
 This code runs fine on 1.0.2. It returns the following result in just over a 
 minute:
 {code}
 [(4, 'NickJohn')]
 {code}
 However, when I try this with 1.1.0-rc3 on an identically-sized cluster, it 
 runs for a very, very long time (at least  45 min) and then fails with 
 {{java.lang.OutOfMemoryError: Java heap space}}.
 Here is a stack trace taken from a run on 1.1.0-rc2:
 {code}
  a = sc.parallelize([Nick, John, Bob])
  a = a.repartition(24000)
  a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent 
 heart beats: 175143ms exceeds 45000ms
 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent 
 heart beats: 175359ms exceeds 45000ms
 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent 
 heart beats: 173061ms exceeds 45000ms
 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent 
 heart beats: 176816ms exceeds 45000ms
 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent 
 heart beats: 182241ms exceeds 45000ms
 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent 
 heart beats: 178406ms exceeds 45000ms
 14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver 
 thread-3
 java.lang.OutOfMemoryError: Java heap space
 at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
 at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at 
 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
 at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
 at 
 org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
 at 
 org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at 

[jira] [Commented] (SPARK-3333) Large number of partitions causes OOM

2014-09-02 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14118775#comment-14118775
 ] 

Josh Rosen commented on SPARK-:
---

Tried this on a m3.xlarge cluster (1 master, 1 worker) using the same script 
from my previous comment.  On this cluster, 1.1.0 was noticeably slower.

In v1.0.2-rc1, it took ~220 seconds.
In v1.1.0-rc3, it took ~360 seconds.

Going to run again and keep the logs to see if I can spot where the extra time 
is coming from.

 Large number of partitions causes OOM
 -

 Key: SPARK-
 URL: https://issues.apache.org/jira/browse/SPARK-
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.1.0
 Environment: EC2; 1 master; 1 slave; {{m3.xlarge}} instances
Reporter: Nicholas Chammas
Priority: Blocker

 Here’s a repro for PySpark:
 {code}
 a = sc.parallelize([Nick, John, Bob])
 a = a.repartition(24000)
 a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 {code}
 This code runs fine on 1.0.2. It returns the following result in just over a 
 minute:
 {code}
 [(4, 'NickJohn')]
 {code}
 However, when I try this with 1.1.0-rc3 on an identically-sized cluster, it 
 runs for a very, very long time (at least  45 min) and then fails with 
 {{java.lang.OutOfMemoryError: Java heap space}}.
 Here is a stack trace taken from a run on 1.1.0-rc2:
 {code}
  a = sc.parallelize([Nick, John, Bob])
  a = a.repartition(24000)
  a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent 
 heart beats: 175143ms exceeds 45000ms
 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent 
 heart beats: 175359ms exceeds 45000ms
 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent 
 heart beats: 173061ms exceeds 45000ms
 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent 
 heart beats: 176816ms exceeds 45000ms
 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent 
 heart beats: 182241ms exceeds 45000ms
 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent 
 heart beats: 178406ms exceeds 45000ms
 14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver 
 thread-3
 java.lang.OutOfMemoryError: Java heap space
 at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
 at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at 
 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
 at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
 at 
 org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
 at 
 org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Exception in thread Result resolver thread-3 14/08/29 21:56:26 ERROR 
 SendingConnection: Exception while reading SendingConnection to 
 ConnectionManagerId(ip-10-73-142-223.ec2.internal,54014)
 java.nio.channels.ClosedChannelException
 at 

[jira] [Commented] (SPARK-3333) Large number of partitions causes OOM

2014-09-02 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14118826#comment-14118826
 ] 

Josh Rosen commented on SPARK-:
---

360/220 is approximately 1.6.  Using Splunk, I computed the average task 
execution time from these logs.  It looks like 1.1.0 took around 60ms per task, 
while 1.0.2 only took 37ms, and 60/37 is also about 1.6 (the standard 
deviations appear to be in the same ballpark, too)..  It seems that the tasks 
themselves are running slightly slower on 1.1.0, at least according to these 
logs.

 Large number of partitions causes OOM
 -

 Key: SPARK-
 URL: https://issues.apache.org/jira/browse/SPARK-
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.1.0
 Environment: EC2; 1 master; 1 slave; {{m3.xlarge}} instances
Reporter: Nicholas Chammas
Priority: Blocker
 Attachments: spark--logs.zip


 Here’s a repro for PySpark:
 {code}
 a = sc.parallelize([Nick, John, Bob])
 a = a.repartition(24000)
 a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 {code}
 This code runs fine on 1.0.2. It returns the following result in just over a 
 minute:
 {code}
 [(4, 'NickJohn')]
 {code}
 However, when I try this with 1.1.0-rc3 on an identically-sized cluster, it 
 runs for a very, very long time (at least  45 min) and then fails with 
 {{java.lang.OutOfMemoryError: Java heap space}}.
 Here is a stack trace taken from a run on 1.1.0-rc2:
 {code}
  a = sc.parallelize([Nick, John, Bob])
  a = a.repartition(24000)
  a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent 
 heart beats: 175143ms exceeds 45000ms
 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent 
 heart beats: 175359ms exceeds 45000ms
 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent 
 heart beats: 173061ms exceeds 45000ms
 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent 
 heart beats: 176816ms exceeds 45000ms
 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent 
 heart beats: 182241ms exceeds 45000ms
 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent 
 heart beats: 178406ms exceeds 45000ms
 14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver 
 thread-3
 java.lang.OutOfMemoryError: Java heap space
 at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
 at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at 
 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
 at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
 at 
 org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
 at 
 org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Exception in thread Result resolver thread-3 14/08/29 21:56:26 ERROR 
 SendingConnection: Exception while reading SendingConnection to 
 

[jira] [Commented] (SPARK-3333) Large number of partitions causes OOM

2014-09-02 Thread Nicholas Chammas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14119119#comment-14119119
 ] 

Nicholas Chammas commented on SPARK-:
-

Just to double check my results, I re-ran my earlier test with the same setup 
as in Josh's most recent report. 

Setup:
* 2 clusters, each with 1 master and 1 slave
* all instances are {{m3.xlarge}}
* one cluster is on 1.0.2 and the other is on 1.1.0-rc3
* AWS region is {{us-east-1c}}

Here are my results for running the following script in the PySpark shell, 
restarting the shell between runs.

Script:
{code}
a = sc.parallelize([Nick, John, Bob])
a = a.repartition(24000)
a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y, 
sc.defaultParallelism).take(1)
{code}

Results (3 runs on each cluster):
|| 1.0.2 || 1.1.0-rc3 ||
| 62s | 356s |
| 63s | 358s |
| 63s | 365s |

The speed slowdown here is roughly 356/62 ~= 5.7.

So the results I'm seeing here are similar to the ones I got in my last test. I 
don't know why Josh's test didn't show as drastic a performance difference. 
I'll attach the driver logs for the record. I'll also try on some {{r3.xlarge}} 
clusters, like Josh did, for comparison.

As has been discussed, I agree this doesn't look like a blocking issue for the 
1.1.0 release, but I do think there is a real issue here (though perhaps not an 
important one). I have not been testing on the 20-node cluster that originally 
yielded the OOM reported here, but will do so later this week if I have time.

 Large number of partitions causes OOM
 -

 Key: SPARK-
 URL: https://issues.apache.org/jira/browse/SPARK-
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.1.0
 Environment: EC2; 1 master; 1 slave; {{m3.xlarge}} instances
Reporter: Nicholas Chammas
Priority: Blocker
 Attachments: spark--logs.zip


 Here’s a repro for PySpark:
 {code}
 a = sc.parallelize([Nick, John, Bob])
 a = a.repartition(24000)
 a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 {code}
 This code runs fine on 1.0.2. It returns the following result in just over a 
 minute:
 {code}
 [(4, 'NickJohn')]
 {code}
 However, when I try this with 1.1.0-rc3 on an identically-sized cluster, it 
 runs for a very, very long time (at least  45 min) and then fails with 
 {{java.lang.OutOfMemoryError: Java heap space}}.
 Here is a stack trace taken from a run on 1.1.0-rc2:
 {code}
  a = sc.parallelize([Nick, John, Bob])
  a = a.repartition(24000)
  a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent 
 heart beats: 175143ms exceeds 45000ms
 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent 
 heart beats: 175359ms exceeds 45000ms
 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent 
 heart beats: 173061ms exceeds 45000ms
 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent 
 heart beats: 176816ms exceeds 45000ms
 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent 
 heart beats: 182241ms exceeds 45000ms
 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent 
 heart beats: 178406ms exceeds 45000ms
 14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver 
 thread-3
 java.lang.OutOfMemoryError: Java heap space
 at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
 at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at 
 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
 at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
 at 
 org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
 at 
 

[jira] [Commented] (SPARK-3333) Large number of partitions causes OOM

2014-09-02 Thread Nicholas Chammas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14119177#comment-14119177
 ] 

Nicholas Chammas commented on SPARK-:
-

So I've repeated the tests with the exact same setup described in my previous 
comment, except this time the instances are {{r3.xlarge}}. 

Here are my results:

|| 1.0.2 || 1.1.0-rc3 ||
| 74s | 66s |
| 69s | 67s |
| 72s | 67s |

So this agrees with Josh's earlier results and suggests this may be something 
specific to {{m3}} PV instances. Josh is exploring that in more detail in 
[SPARK-3358].

In conclusion, once you control the number of reducers, there should be no 
performance degradation (with the m3/PV caveats Josh is investigating).

Perhaps for this JIRA issue all we need is some documentation of the change in 
the default number of reducers in PySpark from 1.0 to 1.1 and we're good to go?

 Large number of partitions causes OOM
 -

 Key: SPARK-
 URL: https://issues.apache.org/jira/browse/SPARK-
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.1.0
 Environment: EC2; 1 master; 1 slave; {{m3.xlarge}} instances
Reporter: Nicholas Chammas
Priority: Blocker
 Attachments: nick-1.0.2.driver.log.zip, 
 nick-1.1.0-rc3.driver.log.zip, spark--logs.zip


 Here’s a repro for PySpark:
 {code}
 a = sc.parallelize([Nick, John, Bob])
 a = a.repartition(24000)
 a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 {code}
 This code runs fine on 1.0.2. It returns the following result in just over a 
 minute:
 {code}
 [(4, 'NickJohn')]
 {code}
 However, when I try this with 1.1.0-rc3 on an identically-sized cluster, it 
 runs for a very, very long time (at least  45 min) and then fails with 
 {{java.lang.OutOfMemoryError: Java heap space}}.
 Here is a stack trace taken from a run on 1.1.0-rc2:
 {code}
  a = sc.parallelize([Nick, John, Bob])
  a = a.repartition(24000)
  a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent 
 heart beats: 175143ms exceeds 45000ms
 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent 
 heart beats: 175359ms exceeds 45000ms
 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent 
 heart beats: 173061ms exceeds 45000ms
 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent 
 heart beats: 176816ms exceeds 45000ms
 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent 
 heart beats: 182241ms exceeds 45000ms
 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent 
 heart beats: 178406ms exceeds 45000ms
 14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver 
 thread-3
 java.lang.OutOfMemoryError: Java heap space
 at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
 at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at 
 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
 at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
 at 
 org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
 at 
 org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
 at 
 

[jira] [Commented] (SPARK-3333) Large number of partitions causes OOM

2014-09-01 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14117062#comment-14117062
 ] 

Josh Rosen commented on SPARK-:
---

[~shivaram] and I discussed this; we have a few ideas about what might be 
happening.

I tried running {{sc.parallelize(1 to 
10).repartition(24000).keyBy(x=x).reduceByKey(_+_).collect()}} in 1.0.2 and 
observed similarly slow speed to what I saw in the current 1.1.0 release 
candidate.  When I modified my job to use fewer reducers ({{reduceByKey(_+_, 
4)}}, then the job completed quickly.  You can see similar behavior in Python 
by explicitly specifying a smaller number of reducers.

I think the issue here is that the overhead of sending and processing task 
completions is proportional to O(numReducers).  Specifically, the uncompressed 
size of ShuffleMapTask results is roughly O(numReducers), and there's a 
O(numReducers) processing cost for task completions within DAGScheduler (since 
mapOutputLocations is O(numReducers)).

This normally isn't a problem, but it can impact performance for jobs with 
large numbers of extremely small map tasks (like this job, where nearly all of 
the map tasks are effectively no-ops).  For larger tasks, this cost should be 
masked by larger overheads (such as task processing time).

I'm not sure where the OOM is coming from, but the slow performance that you're 
observing here is probably due to the new default number of reducers 
(https://github.com/apache/spark/pull/1138 exposed this in Python by changing 
it's defaults to match Scala Spark).

As a result, I'm not sure that this is a regression from 1.0.2, since it 
behaves similarly for Scala jobs.  I think we already do some compression of 
the task results and there are probably other improvements that we can make to 
lower these overheads, but I think we should postpone that to 1.2.0.

 Large number of partitions causes OOM
 -

 Key: SPARK-
 URL: https://issues.apache.org/jira/browse/SPARK-
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.1.0
 Environment: EC2; 1 master; 1 slave; {{m3.xlarge}} instances
Reporter: Nicholas Chammas

 Here’s a repro for PySpark:
 {code}
 a = sc.parallelize([Nick, John, Bob])
 a = a.repartition(24000)
 a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 {code}
 This code runs fine on 1.0.2. It returns the following result in just over a 
 minute:
 {code}
 [(4, 'NickJohn')]
 {code}
 However, when I try this with 1.1.0-rc3 on an identically-sized cluster, it 
 runs for a very, very long time (at least  45 min) and then fails with 
 {{java.lang.OutOfMemoryError: Java heap space}}.
 Here is a stack trace taken from a run on 1.1.0-rc2:
 {code}
  a = sc.parallelize([Nick, John, Bob])
  a = a.repartition(24000)
  a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent 
 heart beats: 175143ms exceeds 45000ms
 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent 
 heart beats: 175359ms exceeds 45000ms
 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent 
 heart beats: 173061ms exceeds 45000ms
 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent 
 heart beats: 176816ms exceeds 45000ms
 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent 
 heart beats: 182241ms exceeds 45000ms
 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent 
 heart beats: 178406ms exceeds 45000ms
 14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver 
 thread-3
 java.lang.OutOfMemoryError: Java heap space
 at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
 at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at 
 

[jira] [Commented] (SPARK-3333) Large number of partitions causes OOM

2014-09-01 Thread Patrick Wendell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14117770#comment-14117770
 ] 

Patrick Wendell commented on SPARK-:


[~nchammas]. I think the default number of reducers could be the culprit. If 
you look at the actual job run in Spark 1.0.2, how many reducers are there in 
the shuffle? How many are there? What happens, if you run the code in Spark 
1.1.0 and specify that number of reducers that matches the number chose in 
1.0.2... then is the performance the same?

If this is the case we should probably document this clearly in the release 
notes, since changing this default could have major implications for those 
relying on the default behavior.

 Large number of partitions causes OOM
 -

 Key: SPARK-
 URL: https://issues.apache.org/jira/browse/SPARK-
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.1.0
 Environment: EC2; 1 master; 1 slave; {{m3.xlarge}} instances
Reporter: Nicholas Chammas

 Here’s a repro for PySpark:
 {code}
 a = sc.parallelize([Nick, John, Bob])
 a = a.repartition(24000)
 a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 {code}
 This code runs fine on 1.0.2. It returns the following result in just over a 
 minute:
 {code}
 [(4, 'NickJohn')]
 {code}
 However, when I try this with 1.1.0-rc3 on an identically-sized cluster, it 
 runs for a very, very long time (at least  45 min) and then fails with 
 {{java.lang.OutOfMemoryError: Java heap space}}.
 Here is a stack trace taken from a run on 1.1.0-rc2:
 {code}
  a = sc.parallelize([Nick, John, Bob])
  a = a.repartition(24000)
  a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent 
 heart beats: 175143ms exceeds 45000ms
 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent 
 heart beats: 175359ms exceeds 45000ms
 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent 
 heart beats: 173061ms exceeds 45000ms
 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent 
 heart beats: 176816ms exceeds 45000ms
 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent 
 heart beats: 182241ms exceeds 45000ms
 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent 
 heart beats: 178406ms exceeds 45000ms
 14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver 
 thread-3
 java.lang.OutOfMemoryError: Java heap space
 at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
 at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at 
 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
 at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
 at 
 org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
 at 
 org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Exception in thread Result resolver thread-3 

[jira] [Commented] (SPARK-3333) Large number of partitions causes OOM

2014-09-01 Thread Nicholas Chammas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14117853#comment-14117853
 ] 

Nicholas Chammas commented on SPARK-:
-

It looks like the default number of  reducers does indeed explain most of the 
performance difference here. But there is still a significant difference even 
after controlling this variable. 

I have 2 identical EC2 clusters as described in this JIRA issue, one on 1.0.2 
and one on 1.1.0-rc3. This time I ran the following PySpark code:

{code}
a = sc.parallelize([Nick, John, Bob])
a = a.repartition(24000)
a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y, 
sc.defaultParallelism).take(1)
{code}

Here are my results for 3 runs on each cluster:
||1.0.2||1.1.0-rc3||
| 95s | 343s |
| 89s | 336s |
| 95s | 334s |

So manually setting the number of reducers to a smaller number does help a lot, 
but there is still a 3-4x performance slowdown.

Can anyone else replicate this result?

 Large number of partitions causes OOM
 -

 Key: SPARK-
 URL: https://issues.apache.org/jira/browse/SPARK-
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.1.0
 Environment: EC2; 1 master; 1 slave; {{m3.xlarge}} instances
Reporter: Nicholas Chammas

 Here’s a repro for PySpark:
 {code}
 a = sc.parallelize([Nick, John, Bob])
 a = a.repartition(24000)
 a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 {code}
 This code runs fine on 1.0.2. It returns the following result in just over a 
 minute:
 {code}
 [(4, 'NickJohn')]
 {code}
 However, when I try this with 1.1.0-rc3 on an identically-sized cluster, it 
 runs for a very, very long time (at least  45 min) and then fails with 
 {{java.lang.OutOfMemoryError: Java heap space}}.
 Here is a stack trace taken from a run on 1.1.0-rc2:
 {code}
  a = sc.parallelize([Nick, John, Bob])
  a = a.repartition(24000)
  a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent 
 heart beats: 175143ms exceeds 45000ms
 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent 
 heart beats: 175359ms exceeds 45000ms
 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent 
 heart beats: 173061ms exceeds 45000ms
 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent 
 heart beats: 176816ms exceeds 45000ms
 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent 
 heart beats: 182241ms exceeds 45000ms
 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent 
 heart beats: 178406ms exceeds 45000ms
 14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver 
 thread-3
 java.lang.OutOfMemoryError: Java heap space
 at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
 at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at 
 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
 at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
 at 
 org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
 at 
 org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
 at 
 

[jira] [Commented] (SPARK-3333) Large number of partitions causes OOM

2014-09-01 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14117905#comment-14117905
 ] 

Josh Rosen commented on SPARK-:
---

Still investigating.  I tried this on my laptop by running the following script 
through spark-submit:

{code}
from pyspark import SparkContext

sc = SparkContext(appName=test)
a = sc.parallelize([Nick, John, Bob])
a = a.repartition(1)
parallelism = 4
a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y, parallelism).take(1)
{code}

With spark-1.0.2-bin-hadoop1, this ran in ~46 seconds, while branch-1.1 took 
~30 seconds.  Spark 1.0.2 seemed to experience one long pause that might have 
been due to GC, but I'll have to measure that.  Both ran to completion without 
crashing.  I'll see what happens if I bump up the number of partitions.

 Large number of partitions causes OOM
 -

 Key: SPARK-
 URL: https://issues.apache.org/jira/browse/SPARK-
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.1.0
 Environment: EC2; 1 master; 1 slave; {{m3.xlarge}} instances
Reporter: Nicholas Chammas
Priority: Blocker

 Here’s a repro for PySpark:
 {code}
 a = sc.parallelize([Nick, John, Bob])
 a = a.repartition(24000)
 a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 {code}
 This code runs fine on 1.0.2. It returns the following result in just over a 
 minute:
 {code}
 [(4, 'NickJohn')]
 {code}
 However, when I try this with 1.1.0-rc3 on an identically-sized cluster, it 
 runs for a very, very long time (at least  45 min) and then fails with 
 {{java.lang.OutOfMemoryError: Java heap space}}.
 Here is a stack trace taken from a run on 1.1.0-rc2:
 {code}
  a = sc.parallelize([Nick, John, Bob])
  a = a.repartition(24000)
  a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent 
 heart beats: 175143ms exceeds 45000ms
 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent 
 heart beats: 175359ms exceeds 45000ms
 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent 
 heart beats: 173061ms exceeds 45000ms
 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent 
 heart beats: 176816ms exceeds 45000ms
 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent 
 heart beats: 182241ms exceeds 45000ms
 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent 
 heart beats: 178406ms exceeds 45000ms
 14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver 
 thread-3
 java.lang.OutOfMemoryError: Java heap space
 at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
 at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at 
 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
 at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
 at 
 org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
 at 
 org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 

[jira] [Commented] (SPARK-3333) Large number of partitions causes OOM

2014-08-31 Thread Nicholas Chammas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14116895#comment-14116895
 ] 

Nicholas Chammas commented on SPARK-:
-

Note: I have not yet confirmed that 1.1.0-rc3 yields the exact same stack trace 
as the one provided above (which is for 1.1.0-rc2), though I expect them to be 
the same. I _can_ confirm that it takes a very, very long time to run, as it is 
running right now on rc3 and has been for about 45 minutes. Since I have to be 
offline for a bit, I thought I'd report this issue ASAP with the rc2 stack 
trace and update it later with a stack trace from rc3.

 Large number of partitions causes OOM
 -

 Key: SPARK-
 URL: https://issues.apache.org/jira/browse/SPARK-
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.1.0
 Environment: EC2; 1 master; 1 slave; {{m3.xlarge}} instances
Reporter: Nicholas Chammas

 Here’s a repro for PySpark:
 {code}
 a = sc.parallelize([Nick, John, Bob])
 a = a.repartition(24000)
 a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 {code}
 This code runs fine on 1.0.2. It returns the following result in just over a 
 minute:
 {code}
 [(4, 'NickJohn')]
 {code}
 However, when I try this with 1.1.0-rc3 on an identically-sized cluster, it 
 runs for a very, very long time (at least  45 min) and then fails with 
 {{java.lang.OutOfMemoryError: Java heap space}}.
 Here is a stack trace taken from a run on 1.1.0-rc2:
 {code}
  a = sc.parallelize([Nick, John, Bob])
  a = a.repartition(24000)
  a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent 
 heart beats: 175143ms exceeds 45000ms
 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent 
 heart beats: 175359ms exceeds 45000ms
 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent 
 heart beats: 173061ms exceeds 45000ms
 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent 
 heart beats: 176816ms exceeds 45000ms
 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent 
 heart beats: 182241ms exceeds 45000ms
 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent 
 heart beats: 178406ms exceeds 45000ms
 14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver 
 thread-3
 java.lang.OutOfMemoryError: Java heap space
 at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
 at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at 
 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
 at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
 at 
 org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
 at 
 org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Exception in thread Result resolver thread-3 14/08/29 21:56:26 ERROR 
 SendingConnection: Exception while reading SendingConnection to 
 

[jira] [Commented] (SPARK-3333) Large number of partitions causes OOM

2014-08-31 Thread Patrick Wendell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14116902#comment-14116902
 ] 

Patrick Wendell commented on SPARK-:


Hey [~nchammas] - I don't think anything relevant to this issue has changed 
between RC2 and RC3, so the RC2 trace is probably sufficient.

 Large number of partitions causes OOM
 -

 Key: SPARK-
 URL: https://issues.apache.org/jira/browse/SPARK-
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.1.0
 Environment: EC2; 1 master; 1 slave; {{m3.xlarge}} instances
Reporter: Nicholas Chammas

 Here’s a repro for PySpark:
 {code}
 a = sc.parallelize([Nick, John, Bob])
 a = a.repartition(24000)
 a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 {code}
 This code runs fine on 1.0.2. It returns the following result in just over a 
 minute:
 {code}
 [(4, 'NickJohn')]
 {code}
 However, when I try this with 1.1.0-rc3 on an identically-sized cluster, it 
 runs for a very, very long time (at least  45 min) and then fails with 
 {{java.lang.OutOfMemoryError: Java heap space}}.
 Here is a stack trace taken from a run on 1.1.0-rc2:
 {code}
  a = sc.parallelize([Nick, John, Bob])
  a = a.repartition(24000)
  a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent 
 heart beats: 175143ms exceeds 45000ms
 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent 
 heart beats: 175359ms exceeds 45000ms
 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent 
 heart beats: 173061ms exceeds 45000ms
 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent 
 heart beats: 176816ms exceeds 45000ms
 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent 
 heart beats: 182241ms exceeds 45000ms
 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent 
 heart beats: 178406ms exceeds 45000ms
 14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver 
 thread-3
 java.lang.OutOfMemoryError: Java heap space
 at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
 at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at 
 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
 at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
 at 
 org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
 at 
 org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Exception in thread Result resolver thread-3 14/08/29 21:56:26 ERROR 
 SendingConnection: Exception while reading SendingConnection to 
 ConnectionManagerId(ip-10-73-142-223.ec2.internal,54014)
 java.nio.channels.ClosedChannelException
 at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
 at 

[jira] [Commented] (SPARK-3333) Large number of partitions causes OOM

2014-08-31 Thread Matei Zaharia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14116923#comment-14116923
 ] 

Matei Zaharia commented on SPARK-:
--

The slowdown might be partly due to adding external spilling in Python, but 
it's weird that this would crash the driver.

 Large number of partitions causes OOM
 -

 Key: SPARK-
 URL: https://issues.apache.org/jira/browse/SPARK-
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.1.0
 Environment: EC2; 1 master; 1 slave; {{m3.xlarge}} instances
Reporter: Nicholas Chammas

 Here’s a repro for PySpark:
 {code}
 a = sc.parallelize([Nick, John, Bob])
 a = a.repartition(24000)
 a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 {code}
 This code runs fine on 1.0.2. It returns the following result in just over a 
 minute:
 {code}
 [(4, 'NickJohn')]
 {code}
 However, when I try this with 1.1.0-rc3 on an identically-sized cluster, it 
 runs for a very, very long time (at least  45 min) and then fails with 
 {{java.lang.OutOfMemoryError: Java heap space}}.
 Here is a stack trace taken from a run on 1.1.0-rc2:
 {code}
  a = sc.parallelize([Nick, John, Bob])
  a = a.repartition(24000)
  a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent 
 heart beats: 175143ms exceeds 45000ms
 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent 
 heart beats: 175359ms exceeds 45000ms
 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent 
 heart beats: 173061ms exceeds 45000ms
 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent 
 heart beats: 176816ms exceeds 45000ms
 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent 
 heart beats: 182241ms exceeds 45000ms
 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent 
 heart beats: 178406ms exceeds 45000ms
 14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver 
 thread-3
 java.lang.OutOfMemoryError: Java heap space
 at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
 at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at 
 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
 at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
 at 
 org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
 at 
 org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Exception in thread Result resolver thread-3 14/08/29 21:56:26 ERROR 
 SendingConnection: Exception while reading SendingConnection to 
 ConnectionManagerId(ip-10-73-142-223.ec2.internal,54014)
 java.nio.channels.ClosedChannelException
 at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
 at org.apache.spark.network.SendingConnection.read(Connection.scala:390)
 at 
 

[jira] [Commented] (SPARK-3333) Large number of partitions causes OOM

2014-08-31 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14116936#comment-14116936
 ] 

Josh Rosen commented on SPARK-:
---

I agree with Davies; I think this is a more general Spark issue, perhaps 
related to {{repartition()}}.

I just tried testing this locally with commit 
eff9714e1c88e39e28317358ca9ec87677f121dc, which is the commit immediately prior 
to 
[14174abd421318e71c16edd24224fd5094bdfed4|https://github.com/apache/spark/commit/14174abd421318e71c16edd24224fd5094bdfed4],
 Davies' patch that adds hash-based disk spilling aggregation to PySpark, and I 
still saw the same slowdown there.



 Large number of partitions causes OOM
 -

 Key: SPARK-
 URL: https://issues.apache.org/jira/browse/SPARK-
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.1.0
 Environment: EC2; 1 master; 1 slave; {{m3.xlarge}} instances
Reporter: Nicholas Chammas

 Here’s a repro for PySpark:
 {code}
 a = sc.parallelize([Nick, John, Bob])
 a = a.repartition(24000)
 a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 {code}
 This code runs fine on 1.0.2. It returns the following result in just over a 
 minute:
 {code}
 [(4, 'NickJohn')]
 {code}
 However, when I try this with 1.1.0-rc3 on an identically-sized cluster, it 
 runs for a very, very long time (at least  45 min) and then fails with 
 {{java.lang.OutOfMemoryError: Java heap space}}.
 Here is a stack trace taken from a run on 1.1.0-rc2:
 {code}
  a = sc.parallelize([Nick, John, Bob])
  a = a.repartition(24000)
  a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent 
 heart beats: 175143ms exceeds 45000ms
 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent 
 heart beats: 175359ms exceeds 45000ms
 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent 
 heart beats: 173061ms exceeds 45000ms
 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent 
 heart beats: 176816ms exceeds 45000ms
 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent 
 heart beats: 182241ms exceeds 45000ms
 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent 
 heart beats: 178406ms exceeds 45000ms
 14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver 
 thread-3
 java.lang.OutOfMemoryError: Java heap space
 at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
 at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at 
 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
 at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
 at 
 org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
 at 
 org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Exception in thread Result resolver thread-3 14/08/29 21:56:26 ERROR 
 SendingConnection: Exception while reading 

[jira] [Commented] (SPARK-3333) Large number of partitions causes OOM

2014-08-31 Thread Matei Zaharia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14116942#comment-14116942
 ] 

Matei Zaharia commented on SPARK-:
--

I see, that makes sense.

 Large number of partitions causes OOM
 -

 Key: SPARK-
 URL: https://issues.apache.org/jira/browse/SPARK-
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.1.0
 Environment: EC2; 1 master; 1 slave; {{m3.xlarge}} instances
Reporter: Nicholas Chammas

 Here’s a repro for PySpark:
 {code}
 a = sc.parallelize([Nick, John, Bob])
 a = a.repartition(24000)
 a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 {code}
 This code runs fine on 1.0.2. It returns the following result in just over a 
 minute:
 {code}
 [(4, 'NickJohn')]
 {code}
 However, when I try this with 1.1.0-rc3 on an identically-sized cluster, it 
 runs for a very, very long time (at least  45 min) and then fails with 
 {{java.lang.OutOfMemoryError: Java heap space}}.
 Here is a stack trace taken from a run on 1.1.0-rc2:
 {code}
  a = sc.parallelize([Nick, John, Bob])
  a = a.repartition(24000)
  a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent 
 heart beats: 175143ms exceeds 45000ms
 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent 
 heart beats: 175359ms exceeds 45000ms
 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent 
 heart beats: 173061ms exceeds 45000ms
 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent 
 heart beats: 176816ms exceeds 45000ms
 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent 
 heart beats: 182241ms exceeds 45000ms
 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent 
 heart beats: 178406ms exceeds 45000ms
 14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver 
 thread-3
 java.lang.OutOfMemoryError: Java heap space
 at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
 at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at 
 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
 at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
 at 
 org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
 at 
 org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Exception in thread Result resolver thread-3 14/08/29 21:56:26 ERROR 
 SendingConnection: Exception while reading SendingConnection to 
 ConnectionManagerId(ip-10-73-142-223.ec2.internal,54014)
 java.nio.channels.ClosedChannelException
 at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
 at org.apache.spark.network.SendingConnection.read(Connection.scala:390)
 at 
 org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
 at 
 

[jira] [Commented] (SPARK-3333) Large number of partitions causes OOM

2014-08-31 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14116948#comment-14116948
 ] 

Josh Rosen commented on SPARK-:
---

Working on doing some manual bisecting to find the patch that introduced the 
slowdown.  It's still slow as early as 8d338f64c4eda45d22ae33f61ef7928011cc2846.

 Large number of partitions causes OOM
 -

 Key: SPARK-
 URL: https://issues.apache.org/jira/browse/SPARK-
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.1.0
 Environment: EC2; 1 master; 1 slave; {{m3.xlarge}} instances
Reporter: Nicholas Chammas

 Here’s a repro for PySpark:
 {code}
 a = sc.parallelize([Nick, John, Bob])
 a = a.repartition(24000)
 a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 {code}
 This code runs fine on 1.0.2. It returns the following result in just over a 
 minute:
 {code}
 [(4, 'NickJohn')]
 {code}
 However, when I try this with 1.1.0-rc3 on an identically-sized cluster, it 
 runs for a very, very long time (at least  45 min) and then fails with 
 {{java.lang.OutOfMemoryError: Java heap space}}.
 Here is a stack trace taken from a run on 1.1.0-rc2:
 {code}
  a = sc.parallelize([Nick, John, Bob])
  a = a.repartition(24000)
  a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent 
 heart beats: 175143ms exceeds 45000ms
 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent 
 heart beats: 175359ms exceeds 45000ms
 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent 
 heart beats: 173061ms exceeds 45000ms
 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent 
 heart beats: 176816ms exceeds 45000ms
 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent 
 heart beats: 182241ms exceeds 45000ms
 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent 
 heart beats: 178406ms exceeds 45000ms
 14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver 
 thread-3
 java.lang.OutOfMemoryError: Java heap space
 at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
 at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at 
 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
 at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
 at 
 org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
 at 
 org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Exception in thread Result resolver thread-3 14/08/29 21:56:26 ERROR 
 SendingConnection: Exception while reading SendingConnection to 
 ConnectionManagerId(ip-10-73-142-223.ec2.internal,54014)
 java.nio.channels.ClosedChannelException
 at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
 at 

[jira] [Commented] (SPARK-3333) Large number of partitions causes OOM

2014-08-31 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14116969#comment-14116969
 ] 

Josh Rosen commented on SPARK-:
---

Looks like the issue was introduced somewhere between 273afcb and 62d4a0f.

 Large number of partitions causes OOM
 -

 Key: SPARK-
 URL: https://issues.apache.org/jira/browse/SPARK-
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.1.0
 Environment: EC2; 1 master; 1 slave; {{m3.xlarge}} instances
Reporter: Nicholas Chammas

 Here’s a repro for PySpark:
 {code}
 a = sc.parallelize([Nick, John, Bob])
 a = a.repartition(24000)
 a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 {code}
 This code runs fine on 1.0.2. It returns the following result in just over a 
 minute:
 {code}
 [(4, 'NickJohn')]
 {code}
 However, when I try this with 1.1.0-rc3 on an identically-sized cluster, it 
 runs for a very, very long time (at least  45 min) and then fails with 
 {{java.lang.OutOfMemoryError: Java heap space}}.
 Here is a stack trace taken from a run on 1.1.0-rc2:
 {code}
  a = sc.parallelize([Nick, John, Bob])
  a = a.repartition(24000)
  a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent 
 heart beats: 175143ms exceeds 45000ms
 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent 
 heart beats: 175359ms exceeds 45000ms
 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent 
 heart beats: 173061ms exceeds 45000ms
 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent 
 heart beats: 176816ms exceeds 45000ms
 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent 
 heart beats: 182241ms exceeds 45000ms
 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent 
 heart beats: 178406ms exceeds 45000ms
 14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver 
 thread-3
 java.lang.OutOfMemoryError: Java heap space
 at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
 at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at 
 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
 at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
 at 
 org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
 at 
 org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Exception in thread Result resolver thread-3 14/08/29 21:56:26 ERROR 
 SendingConnection: Exception while reading SendingConnection to 
 ConnectionManagerId(ip-10-73-142-223.ec2.internal,54014)
 java.nio.channels.ClosedChannelException
 at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
 at org.apache.spark.network.SendingConnection.read(Connection.scala:390)
 at 
 

[jira] [Commented] (SPARK-3333) Large number of partitions causes OOM

2014-08-31 Thread Nicholas Chammas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14116978#comment-14116978
 ] 

Nicholas Chammas commented on SPARK-:
-

For the record, I got the OOM in a relatively short amount of time (less than 
30 min) on a 1.1.0-rc2 EC2 cluster with 20 {{m1.xlarge}} slaves. Perhaps one of 
y'all can replicate the OOM with that kind of environment.

 Large number of partitions causes OOM
 -

 Key: SPARK-
 URL: https://issues.apache.org/jira/browse/SPARK-
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.1.0
 Environment: EC2; 1 master; 1 slave; {{m3.xlarge}} instances
Reporter: Nicholas Chammas

 Here’s a repro for PySpark:
 {code}
 a = sc.parallelize([Nick, John, Bob])
 a = a.repartition(24000)
 a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 {code}
 This code runs fine on 1.0.2. It returns the following result in just over a 
 minute:
 {code}
 [(4, 'NickJohn')]
 {code}
 However, when I try this with 1.1.0-rc3 on an identically-sized cluster, it 
 runs for a very, very long time (at least  45 min) and then fails with 
 {{java.lang.OutOfMemoryError: Java heap space}}.
 Here is a stack trace taken from a run on 1.1.0-rc2:
 {code}
  a = sc.parallelize([Nick, John, Bob])
  a = a.repartition(24000)
  a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent 
 heart beats: 175143ms exceeds 45000ms
 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent 
 heart beats: 175359ms exceeds 45000ms
 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent 
 heart beats: 173061ms exceeds 45000ms
 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent 
 heart beats: 176816ms exceeds 45000ms
 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent 
 heart beats: 182241ms exceeds 45000ms
 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent 
 heart beats: 178406ms exceeds 45000ms
 14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver 
 thread-3
 java.lang.OutOfMemoryError: Java heap space
 at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
 at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at 
 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
 at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
 at 
 org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
 at 
 org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Exception in thread Result resolver thread-3 14/08/29 21:56:26 ERROR 
 SendingConnection: Exception while reading SendingConnection to 
 ConnectionManagerId(ip-10-73-142-223.ec2.internal,54014)
 java.nio.channels.ClosedChannelException
 at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
 at 

[jira] [Commented] (SPARK-3333) Large number of partitions causes OOM

2014-08-31 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14116985#comment-14116985
 ] 

Josh Rosen commented on SPARK-:
---

I'll resume work on this later tonight, but just wanted to note that things run 
fast as recently as commit 5ad5e34 and slow down as long ago as 6587ef7.

 Large number of partitions causes OOM
 -

 Key: SPARK-
 URL: https://issues.apache.org/jira/browse/SPARK-
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.1.0
 Environment: EC2; 1 master; 1 slave; {{m3.xlarge}} instances
Reporter: Nicholas Chammas

 Here’s a repro for PySpark:
 {code}
 a = sc.parallelize([Nick, John, Bob])
 a = a.repartition(24000)
 a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 {code}
 This code runs fine on 1.0.2. It returns the following result in just over a 
 minute:
 {code}
 [(4, 'NickJohn')]
 {code}
 However, when I try this with 1.1.0-rc3 on an identically-sized cluster, it 
 runs for a very, very long time (at least  45 min) and then fails with 
 {{java.lang.OutOfMemoryError: Java heap space}}.
 Here is a stack trace taken from a run on 1.1.0-rc2:
 {code}
  a = sc.parallelize([Nick, John, Bob])
  a = a.repartition(24000)
  a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent 
 heart beats: 175143ms exceeds 45000ms
 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent 
 heart beats: 175359ms exceeds 45000ms
 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent 
 heart beats: 173061ms exceeds 45000ms
 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent 
 heart beats: 176816ms exceeds 45000ms
 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent 
 heart beats: 182241ms exceeds 45000ms
 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent 
 heart beats: 178406ms exceeds 45000ms
 14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver 
 thread-3
 java.lang.OutOfMemoryError: Java heap space
 at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
 at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at 
 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
 at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
 at 
 org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
 at 
 org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Exception in thread Result resolver thread-3 14/08/29 21:56:26 ERROR 
 SendingConnection: Exception while reading SendingConnection to 
 ConnectionManagerId(ip-10-73-142-223.ec2.internal,54014)
 java.nio.channels.ClosedChannelException
 at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
 at 

[jira] [Commented] (SPARK-3333) Large number of partitions causes OOM

2014-08-31 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14117016#comment-14117016
 ] 

Josh Rosen commented on SPARK-:
---

It looks like https://github.com/apache/spark/pull/1138 may be the culprit, 
since this job runs quickly immediately prior to that commit.

 Large number of partitions causes OOM
 -

 Key: SPARK-
 URL: https://issues.apache.org/jira/browse/SPARK-
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.1.0
 Environment: EC2; 1 master; 1 slave; {{m3.xlarge}} instances
Reporter: Nicholas Chammas

 Here’s a repro for PySpark:
 {code}
 a = sc.parallelize([Nick, John, Bob])
 a = a.repartition(24000)
 a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 {code}
 This code runs fine on 1.0.2. It returns the following result in just over a 
 minute:
 {code}
 [(4, 'NickJohn')]
 {code}
 However, when I try this with 1.1.0-rc3 on an identically-sized cluster, it 
 runs for a very, very long time (at least  45 min) and then fails with 
 {{java.lang.OutOfMemoryError: Java heap space}}.
 Here is a stack trace taken from a run on 1.1.0-rc2:
 {code}
  a = sc.parallelize([Nick, John, Bob])
  a = a.repartition(24000)
  a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent 
 heart beats: 175143ms exceeds 45000ms
 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent 
 heart beats: 175359ms exceeds 45000ms
 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent 
 heart beats: 173061ms exceeds 45000ms
 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent 
 heart beats: 176816ms exceeds 45000ms
 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent 
 heart beats: 182241ms exceeds 45000ms
 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent 
 heart beats: 178406ms exceeds 45000ms
 14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver 
 thread-3
 java.lang.OutOfMemoryError: Java heap space
 at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
 at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at 
 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
 at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
 at 
 org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
 at 
 org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Exception in thread Result resolver thread-3 14/08/29 21:56:26 ERROR 
 SendingConnection: Exception while reading SendingConnection to 
 ConnectionManagerId(ip-10-73-142-223.ec2.internal,54014)
 java.nio.channels.ClosedChannelException
 at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
 at org.apache.spark.network.SendingConnection.read(Connection.scala:390)

[jira] [Commented] (SPARK-3333) Large number of partitions causes OOM

2014-08-31 Thread Davies Liu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14117019#comment-14117019
 ] 

Davies Liu commented on SPARK-:
---

@joserosen This should not be the culprit, it just show the bad things up in 
PySpark. Before it, the default partitions of reduceByKey() could be something 
much smaller, such as 4.

The root cause should be inside Scala, you should use the Scala one to test it.

 Large number of partitions causes OOM
 -

 Key: SPARK-
 URL: https://issues.apache.org/jira/browse/SPARK-
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.1.0
 Environment: EC2; 1 master; 1 slave; {{m3.xlarge}} instances
Reporter: Nicholas Chammas

 Here’s a repro for PySpark:
 {code}
 a = sc.parallelize([Nick, John, Bob])
 a = a.repartition(24000)
 a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 {code}
 This code runs fine on 1.0.2. It returns the following result in just over a 
 minute:
 {code}
 [(4, 'NickJohn')]
 {code}
 However, when I try this with 1.1.0-rc3 on an identically-sized cluster, it 
 runs for a very, very long time (at least  45 min) and then fails with 
 {{java.lang.OutOfMemoryError: Java heap space}}.
 Here is a stack trace taken from a run on 1.1.0-rc2:
 {code}
  a = sc.parallelize([Nick, John, Bob])
  a = a.repartition(24000)
  a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent 
 heart beats: 175143ms exceeds 45000ms
 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent 
 heart beats: 175359ms exceeds 45000ms
 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent 
 heart beats: 173061ms exceeds 45000ms
 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent 
 heart beats: 176816ms exceeds 45000ms
 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent 
 heart beats: 182241ms exceeds 45000ms
 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent 
 heart beats: 178406ms exceeds 45000ms
 14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver 
 thread-3
 java.lang.OutOfMemoryError: Java heap space
 at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
 at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at 
 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
 at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
 at 
 org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
 at 
 org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Exception in thread Result resolver thread-3 14/08/29 21:56:26 ERROR 
 SendingConnection: Exception while reading SendingConnection to 
 ConnectionManagerId(ip-10-73-142-223.ec2.internal,54014)
 java.nio.channels.ClosedChannelException
 at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
 at 

[jira] [Commented] (SPARK-3333) Large number of partitions causes OOM

2014-08-31 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14117021#comment-14117021
 ] 

Josh Rosen commented on SPARK-:
---

Good point.  I guess culprit was the wrong word, but that commit helps to 
narrow down the problem.

 Large number of partitions causes OOM
 -

 Key: SPARK-
 URL: https://issues.apache.org/jira/browse/SPARK-
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.1.0
 Environment: EC2; 1 master; 1 slave; {{m3.xlarge}} instances
Reporter: Nicholas Chammas

 Here’s a repro for PySpark:
 {code}
 a = sc.parallelize([Nick, John, Bob])
 a = a.repartition(24000)
 a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 {code}
 This code runs fine on 1.0.2. It returns the following result in just over a 
 minute:
 {code}
 [(4, 'NickJohn')]
 {code}
 However, when I try this with 1.1.0-rc3 on an identically-sized cluster, it 
 runs for a very, very long time (at least  45 min) and then fails with 
 {{java.lang.OutOfMemoryError: Java heap space}}.
 Here is a stack trace taken from a run on 1.1.0-rc2:
 {code}
  a = sc.parallelize([Nick, John, Bob])
  a = a.repartition(24000)
  a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent 
 heart beats: 175143ms exceeds 45000ms
 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent 
 heart beats: 175359ms exceeds 45000ms
 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent 
 heart beats: 173061ms exceeds 45000ms
 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent 
 heart beats: 176816ms exceeds 45000ms
 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent 
 heart beats: 182241ms exceeds 45000ms
 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent 
 heart beats: 178406ms exceeds 45000ms
 14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver 
 thread-3
 java.lang.OutOfMemoryError: Java heap space
 at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
 at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at 
 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
 at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
 at 
 org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
 at 
 org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Exception in thread Result resolver thread-3 14/08/29 21:56:26 ERROR 
 SendingConnection: Exception while reading SendingConnection to 
 ConnectionManagerId(ip-10-73-142-223.ec2.internal,54014)
 java.nio.channels.ClosedChannelException
 at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
 at org.apache.spark.network.SendingConnection.read(Connection.scala:390)
 at