I'm running spark locally on my laptop to explore how persistence impacts
memory use. I'm generating 80 MB matrices in numpy and then simply adding
them as an example problem.
No matter what I set NUM or persistence level to in the code below, I get
out of memory errors like (
https://gist.github.com/ericmjonas/fe062a118232e14d2076 ). I've tried
varying my serializer size, my worker size, etc. such as the following:
SPARK_DRIVER_MEMORY=4g src/spark-1.1.0-bin-cdh4/bin/spark-submit --conf
spark.kryoserializer.buffer.mb=512 --conf spark.akka.frameSize=1000 --conf
spark.executor.memory=4g --conf spark.python.worker.memory=4g --master
local[2] persistlevels.py
If I don't try and persist, the job of course completes as spark is smart
enough to sequentially create-reduce-create-reduce and do the right thing.
Is there something I'm missing here? why should persistence make the sim
crash? Especially with a high number of partitions ?
Thanks for any and all help,
...Eric
import sys
from random import random
from operator import add
import numpy as np
import time
import pyspark
from pyspark import SparkContext , StorageLevel
Test what the persistence levels mean for spark
Exploit the fact that when running locally we can write to
temp files
ROOT_LOG_NAME = /tmp/create_data.%d.%d
def create_data(SIZE_N, index):
fid = open(ROOT_LOG_NAME % (SIZE_N, index), 'w')
fid.write(%03d creating\n % index)
fid.close()
np.random.seed(index)
d = np.random.normal(0, 1, SIZE_N)
return d
if __name__ == __main__:
Usage: pi [slices]
sc = SparkContext(appName=PersistTest, batchSize=1)
def sum_two_mat(a, b):
print Summing matrix
return a + b
def sum_two_mat_p1(a, b):
print Summing matrix + 1
return a + b + 1
SIZE_N = 1000 # generate 80MB matrix
NUM = 100
a = sc.parallelize(xrange(NUM), 10).map(lambda x: create_data(SIZE_N,
x)).persist(StorageLevel.MEMORY_AND_DISK)
# two different reductions to test if we double-generate
b = a.reduce(sum_two_mat)
c = a.reduce(sum_two_mat)
print np.sum(b + c)
sc.stop()