Meaning of persistence levels -- setting persistence causing out of memory errors with pyspark

2014-10-27 Thread Eric Jonas
I'm running spark locally on my laptop to explore how persistence impacts
memory use. I'm generating 80 MB matrices in numpy and then simply adding
them as an example problem.


 No matter what I set NUM or persistence level to in the code below, I get
out of memory errors like (
https://gist.github.com/ericmjonas/fe062a118232e14d2076 ). I've tried
varying my serializer size, my worker size, etc. such as the following:

SPARK_DRIVER_MEMORY=4g src/spark-1.1.0-bin-cdh4/bin/spark-submit  --conf
spark.kryoserializer.buffer.mb=512 --conf spark.akka.frameSize=1000  --conf
spark.executor.memory=4g --conf spark.python.worker.memory=4g --master
local[2] persistlevels.py

If I don't try and persist, the job of course completes as spark is smart
enough to sequentially create-reduce-create-reduce and do the right thing.

Is there something I'm missing here? why should persistence make the sim
crash? Especially with a high number of partitions ?

Thanks for any and all help,

...Eric


import sys
from random import random
from operator import add
import numpy as np
import time

import pyspark
from pyspark import SparkContext , StorageLevel


Test what the persistence levels mean for spark

Exploit the fact that when running locally we can write to
temp files



ROOT_LOG_NAME = /tmp/create_data.%d.%d

def create_data(SIZE_N, index):
fid = open(ROOT_LOG_NAME % (SIZE_N, index), 'w')
fid.write(%03d creating\n % index)
fid.close()
np.random.seed(index)
d = np.random.normal(0, 1, SIZE_N)
return d


if __name__ == __main__:

Usage: pi [slices]

sc = SparkContext(appName=PersistTest, batchSize=1)

def sum_two_mat(a, b):
print Summing matrix
return a + b

def sum_two_mat_p1(a, b):
print Summing matrix + 1
return a + b + 1

SIZE_N = 1000 # generate 80MB matrix

NUM = 100
a = sc.parallelize(xrange(NUM), 10).map(lambda x: create_data(SIZE_N,
x)).persist(StorageLevel.MEMORY_AND_DISK)


# two different reductions to test if we double-generate
b = a.reduce(sum_two_mat)
c = a.reduce(sum_two_mat)
print np.sum(b + c)

sc.stop()


disk-backing pyspark rdds?

2014-10-21 Thread Eric Jonas
Hi All!
I'm getting my feet wet with pySpark for the fairly boring case of
doing parameter sweeps for monte carlo runs. Each of my functions runs for
a very long time (2h+) and return numpy arrays on the order of ~100 MB.
That is, my spark applications look like

def foo(x):
np.random.seed(x)
eat_2GB_of_ram()
take_2h()
return my_100MB_array

sc.parallelize(np.arange(100)).map(f).saveAsPickleFile(s3n://blah...)

The resulting rdds will most likely not fit in memory but for this use case
I don't really care. I know I can persist RDDs, but is there any way to
by-default disk-back them (something analogous to mmap?) so that they don't
create memory pressure in the system at all? With compute taking this long,
the added overhead of disk and network IO is quite minimal.

Thanks!

...Eric Jonas