Re: performance improvement on second operation...without caching?

2014-05-05 Thread Diana Carroll
Ethan, you're not the only one, which is why I was asking about this! :-)

Matei, thanks for your response. your answer explains the performance jump
in my code, but shows I've missed something key in my understanding of
Spark!

I was not aware until just now that map output was saved to disk (other
than if explicitly told to do use using persist.)  It raises almost as many
questions as it answers.

Where are the shuffle files saved?  Locally on the mapper nodes?  Is it the
same location that disk-spilled cache is saved to?  Doesn't the necessity
of saving to disk result in increased i/o that would slow the job down?  I
thought part of the goal of Spark was to do everything in memory unless the
user specifically chose to persist...thereby making a choice to incur
time/disk space expense up front in return for fast failure recovery?

Not that I'm complaining, mind you, but I do think people should be made
clearthis not only affects performance, but also, for instance, whether
the data is fresh/out of date.  I had assumed if I did not set caching,
that each time I performed an operation on an RDD, it would re-compute
based on lineage, including re-reading the files...so I didn't have to
worry about the possibility of my file content changing.  But if it's
auto-caching shuffle files, my base files won't get re-read even if the
content has changed. (Or does it check timestamps?)

Thanks,
Diana








On Mon, May 5, 2014 at 11:07 AM, Ethan Jewett esjew...@gmail.com wrote:

 Thanks Patrick and Matei for the clarification. I actually have to update
 some code now, as I was apparently relying on the fact that the output
 files are being re-used. Explains some edge-case behavior that I've seen.

 For me, at least, I read the guide, did some tests on fairly extensive RDD
 dependency graphs, saw that tasks earlier in the dependency graphs were not
 being regenerated and assumed (very much incorrectly I just found out!)
 that it was because the RDDs themselves were being cached. I wonder if
 there is a way to explain this distinction concisely in the programming
 guide. Or maybe I'm the only one that went down this incorrect learning
 path :-)

 Ethan


 On Sun, May 4, 2014 at 12:05 AM, Matei Zaharia matei.zaha...@gmail.comwrote:

 Yes, this happens as long as you use the same RDD. For example say you do
 the following:

 data1 = sc.textFile(…).map(…).reduceByKey(…)
 data1.count()
 data1.filter(…).count()

 The first count() causes outputs of the map/reduce pair in there to be
 written out to shuffle files. Next time you do a count, on either this RDD
 or a child (e.g. after the filter), we notice that output files were
 already generated for this shuffle so we don’t rerun the map stage. Note
 that the output does get read again over the network, which is kind of
 wasteful (if you really wanted to reuse this as quickly as possible you’d
 use cache()).

 Matei

 On May 3, 2014, at 8:44 PM, Koert Kuipers ko...@tresata.com wrote:

 Hey Matei,
 Not sure i understand that. These are 2 separate jobs. So the second job
 takes advantage of the fact that there is map output left somewhere on disk
 from the first job, and re-uses that?


 On Sat, May 3, 2014 at 8:29 PM, Matei Zaharia matei.zaha...@gmail.comwrote:

 Hi Diana,

 Apart from these reasons, in a multi-stage job, Spark saves the map
 output files from map stages to the filesystem, so it only needs to rerun
 the last reduce stage. This is why you only saw one stage executing. These
 files are saved for fault recovery but they speed up subsequent runs.

 Matei

 On May 3, 2014, at 5:21 PM, Patrick Wendell pwend...@gmail.com wrote:

 Ethan,

 What you said is actually not true, Spark won't cache RDD's unless you
 ask it to.

 The observation here - that running the same job can speed up
 substantially even without caching - is common. This is because other
 components in the stack are performing caching and optimizations. Two that
 can make a huge difference are:

 1. The OS buffer cache. Which will keep recently read disk blocks in
 memory.
 2. The Java just-in-time compiler (JIT) which will use runtime profiling
 to significantly speed up execution speed.

 These can make a huge difference if you are running the same job
 over-and-over. And there are other things like the OS network stack
 increasing TCP windows and so fourth. These will all improve response time
 as a spark program executes.


 On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett esjew...@gmail.com wrote:

 I believe Spark caches RDDs it has memory for regardless of whether you
 actually call the 'cache' method on the RDD. The 'cache' method just tips
 off Spark that the RDD should have higher priority. At least, that is my
 experience and it seems to correspond with your experience and with my
 recollection of other discussions on this topic on the list. However, going
 back and looking at the programming guide, this is not the way the
 cache/persist behavior is described. Does the guide need to be 

Re: performance improvement on second operation...without caching?

2014-05-03 Thread Matei Zaharia
Hi Diana,

Apart from these reasons, in a multi-stage job, Spark saves the map output 
files from map stages to the filesystem, so it only needs to rerun the last 
reduce stage. This is why you only saw one stage executing. These files are 
saved for fault recovery but they speed up subsequent runs.

Matei

On May 3, 2014, at 5:21 PM, Patrick Wendell pwend...@gmail.com wrote:

 Ethan,
 
 What you said is actually not true, Spark won't cache RDD's unless you ask it 
 to.
 
 The observation here - that running the same job can speed up substantially 
 even without caching - is common. This is because other components in the 
 stack are performing caching and optimizations. Two that can make a huge 
 difference are:
 
 1. The OS buffer cache. Which will keep recently read disk blocks in memory.
 2. The Java just-in-time compiler (JIT) which will use runtime profiling to 
 significantly speed up execution speed.
 
 These can make a huge difference if you are running the same job 
 over-and-over. And there are other things like the OS network stack 
 increasing TCP windows and so fourth. These will all improve response time as 
 a spark program executes.
 
 
 On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett esjew...@gmail.com wrote:
 I believe Spark caches RDDs it has memory for regardless of whether you 
 actually call the 'cache' method on the RDD. The 'cache' method just tips off 
 Spark that the RDD should have higher priority. At least, that is my 
 experience and it seems to correspond with your experience and with my 
 recollection of other discussions on this topic on the list. However, going 
 back and looking at the programming guide, this is not the way the 
 cache/persist behavior is described. Does the guide need to be updated?
 
 
 On Fri, May 2, 2014 at 9:04 AM, Diana Carroll dcarr...@cloudera.com wrote:
 I'm just Posty McPostalot this week, sorry folks! :-)
 
 Anyway, another question today:
 I have a bit of code that is pretty time consuming (pasted at the end of the 
 message):
 It reads in a bunch of XML files, parses them, extracts some data in a map, 
 counts (using reduce), and then sorts.   All stages are executed when I do a 
 final operation (take).  The first stage is the most expensive: on first run 
 it takes 30s to a minute.
 
 I'm not caching anything.
 
 When I re-execute that take at the end, I expected it to re-execute all the 
 same stages, and take approximately the same amount of time, but it didn't.  
 The second take executes only a single stage which collectively run very 
 fast: the whole operation takes less than 1 second (down from 5 minutes!)
 
 While this is awesome (!) I don't understand it.  If I'm not caching data, 
 why would I see such a marked performance improvement on subsequent execution?
 
 (or is this related to the known .9.1 bug about sortByKey executing an action 
 when it shouldn't?)
 
 Thanks,
 Diana
 sparkdev_04-23_KEEP_FOR_BUILDS.png
 
 # load XML files containing device activation records.
 # Find the most common device models activated
 import xml.etree.ElementTree as ElementTree
 
 # Given a partition containing multi-line XML, parse the contents. 
 # Return an iterator of activation Elements contained in the partition
 def getactivations(fileiterator):
 s = ''
 for i in fileiterator: s = s + str(i)
 filetree = ElementTree.fromstring(s)
 return filetree.getiterator('activation')
 
 # Get the model name from a device activation record
 def getmodel(activation):
 return activation.find('model').text 
 
 filename=hdfs://localhost/user/training/activations/*.xml
 
 # parse each partition as a file into an activation XML record
 activations = sc.textFile(filename)
 activationTrees = activations.mapPartitions(lambda xml: getactivations(xml))
 models = activationTrees.map(lambda activation: getmodel(activation))
 
 # count and sort activations by model
 topmodels = models.map(lambda model: (model,1))\
 .reduceByKey(lambda v1,v2: v1+v2)\
 .map(lambda (model,count): (count,model))\
 .sortByKey(ascending=False)
 
 # display the top 10 models
 for (count,model) in topmodels.take(10):
 print Model %s (%s) % (model,count)
 
 # repeat!
 for (count,model) in topmodels.take(10):
 print Model %s (%s) % (model,count)
 
 
 



Re: performance improvement on second operation...without caching?

2014-05-03 Thread Koert Kuipers
Hey Matei,
Not sure i understand that. These are 2 separate jobs. So the second job
takes advantage of the fact that there is map output left somewhere on disk
from the first job, and re-uses that?


On Sat, May 3, 2014 at 8:29 PM, Matei Zaharia matei.zaha...@gmail.comwrote:

 Hi Diana,

 Apart from these reasons, in a multi-stage job, Spark saves the map output
 files from map stages to the filesystem, so it only needs to rerun the last
 reduce stage. This is why you only saw one stage executing. These files are
 saved for fault recovery but they speed up subsequent runs.

 Matei

 On May 3, 2014, at 5:21 PM, Patrick Wendell pwend...@gmail.com wrote:

 Ethan,

 What you said is actually not true, Spark won't cache RDD's unless you ask
 it to.

 The observation here - that running the same job can speed up
 substantially even without caching - is common. This is because other
 components in the stack are performing caching and optimizations. Two that
 can make a huge difference are:

 1. The OS buffer cache. Which will keep recently read disk blocks in
 memory.
 2. The Java just-in-time compiler (JIT) which will use runtime profiling
 to significantly speed up execution speed.

 These can make a huge difference if you are running the same job
 over-and-over. And there are other things like the OS network stack
 increasing TCP windows and so fourth. These will all improve response time
 as a spark program executes.


 On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett esjew...@gmail.com wrote:

 I believe Spark caches RDDs it has memory for regardless of whether you
 actually call the 'cache' method on the RDD. The 'cache' method just tips
 off Spark that the RDD should have higher priority. At least, that is my
 experience and it seems to correspond with your experience and with my
 recollection of other discussions on this topic on the list. However, going
 back and looking at the programming guide, this is not the way the
 cache/persist behavior is described. Does the guide need to be updated?


 On Fri, May 2, 2014 at 9:04 AM, Diana Carroll dcarr...@cloudera.comwrote:

 I'm just Posty McPostalot this week, sorry folks! :-)

 Anyway, another question today:
 I have a bit of code that is pretty time consuming (pasted at the end of
 the message):
 It reads in a bunch of XML files, parses them, extracts some data in a
 map, counts (using reduce), and then sorts.   All stages are executed when
 I do a final operation (take).  The first stage is the most expensive: on
 first run it takes 30s to a minute.

 I'm not caching anything.

 When I re-execute that take at the end, I expected it to re-execute all
 the same stages, and take approximately the same amount of time, but it
 didn't.  The second take executes only a single stage which collectively
 run very fast: the whole operation takes less than 1 second (down from 5
 minutes!)

 While this is awesome (!) I don't understand it.  If I'm not caching
 data, why would I see such a marked performance improvement on subsequent
 execution?

 (or is this related to the known .9.1 bug about sortByKey executing an
 action when it shouldn't?)

 Thanks,
 Diana
 sparkdev_04-23_KEEP_FOR_BUILDS.png

 # load XML files containing device activation records.
 # Find the most common device models activated
 import xml.etree.ElementTree as ElementTree

 # Given a partition containing multi-line XML, parse the contents.
 # Return an iterator of activation Elements contained in the partition
 def getactivations(fileiterator):
 s = ''
 for i in fileiterator: s = s + str(i)
 filetree = ElementTree.fromstring(s)
 return filetree.getiterator('activation')

 # Get the model name from a device activation record
 def getmodel(activation):
 return activation.find('model').text

 filename=hdfs://localhost/user/training/activations/*.xml

 # parse each partition as a file into an activation XML record
 activations = sc.textFile(filename)
 activationTrees = activations.mapPartitions(lambda xml:
 getactivations(xml))
 models = activationTrees.map(lambda activation: getmodel(activation))

 # count and sort activations by model
 topmodels = models.map(lambda model: (model,1))\
 .reduceByKey(lambda v1,v2: v1+v2)\
 .map(lambda (model,count): (count,model))\
 .sortByKey(ascending=False)

 # display the top 10 models
 for (count,model) in topmodels.take(10):
 print Model %s (%s) % (model,count)

  # repeat!
 for (count,model) in topmodels.take(10):
 print Model %s (%s) % (model,count)







Re: performance improvement on second operation...without caching?

2014-05-03 Thread Matei Zaharia
Yes, this happens as long as you use the same RDD. For example say you do the 
following:

data1 = sc.textFile(…).map(…).reduceByKey(…)
data1.count()
data1.filter(…).count()

The first count() causes outputs of the map/reduce pair in there to be written 
out to shuffle files. Next time you do a count, on either this RDD or a child 
(e.g. after the filter), we notice that output files were already generated for 
this shuffle so we don’t rerun the map stage. Note that the output does get 
read again over the network, which is kind of wasteful (if you really wanted to 
reuse this as quickly as possible you’d use cache()).

Matei

On May 3, 2014, at 8:44 PM, Koert Kuipers ko...@tresata.com wrote:

 Hey Matei,
 Not sure i understand that. These are 2 separate jobs. So the second job 
 takes advantage of the fact that there is map output left somewhere on disk 
 from the first job, and re-uses that?
 
 
 On Sat, May 3, 2014 at 8:29 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
 Hi Diana,
 
 Apart from these reasons, in a multi-stage job, Spark saves the map output 
 files from map stages to the filesystem, so it only needs to rerun the last 
 reduce stage. This is why you only saw one stage executing. These files are 
 saved for fault recovery but they speed up subsequent runs.
 
 Matei
 
 On May 3, 2014, at 5:21 PM, Patrick Wendell pwend...@gmail.com wrote:
 
 Ethan,
 
 What you said is actually not true, Spark won't cache RDD's unless you ask 
 it to.
 
 The observation here - that running the same job can speed up substantially 
 even without caching - is common. This is because other components in the 
 stack are performing caching and optimizations. Two that can make a huge 
 difference are:
 
 1. The OS buffer cache. Which will keep recently read disk blocks in memory.
 2. The Java just-in-time compiler (JIT) which will use runtime profiling to 
 significantly speed up execution speed.
 
 These can make a huge difference if you are running the same job 
 over-and-over. And there are other things like the OS network stack 
 increasing TCP windows and so fourth. These will all improve response time 
 as a spark program executes.
 
 
 On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett esjew...@gmail.com wrote:
 I believe Spark caches RDDs it has memory for regardless of whether you 
 actually call the 'cache' method on the RDD. The 'cache' method just tips 
 off Spark that the RDD should have higher priority. At least, that is my 
 experience and it seems to correspond with your experience and with my 
 recollection of other discussions on this topic on the list. However, going 
 back and looking at the programming guide, this is not the way the 
 cache/persist behavior is described. Does the guide need to be updated?
 
 
 On Fri, May 2, 2014 at 9:04 AM, Diana Carroll dcarr...@cloudera.com wrote:
 I'm just Posty McPostalot this week, sorry folks! :-)
 
 Anyway, another question today:
 I have a bit of code that is pretty time consuming (pasted at the end of the 
 message):
 It reads in a bunch of XML files, parses them, extracts some data in a map, 
 counts (using reduce), and then sorts.   All stages are executed when I do a 
 final operation (take).  The first stage is the most expensive: on first run 
 it takes 30s to a minute.
 
 I'm not caching anything.
 
 When I re-execute that take at the end, I expected it to re-execute all the 
 same stages, and take approximately the same amount of time, but it didn't.  
 The second take executes only a single stage which collectively run very 
 fast: the whole operation takes less than 1 second (down from 5 minutes!)
 
 While this is awesome (!) I don't understand it.  If I'm not caching data, 
 why would I see such a marked performance improvement on subsequent 
 execution?
 
 (or is this related to the known .9.1 bug about sortByKey executing an 
 action when it shouldn't?)
 
 Thanks,
 Diana
 sparkdev_04-23_KEEP_FOR_BUILDS.png
 
 # load XML files containing device activation records.
 # Find the most common device models activated
 import xml.etree.ElementTree as ElementTree
 
 # Given a partition containing multi-line XML, parse the contents. 
 # Return an iterator of activation Elements contained in the partition
 def getactivations(fileiterator):
 s = ''
 for i in fileiterator: s = s + str(i)
 filetree = ElementTree.fromstring(s)
 return filetree.getiterator('activation')
 
 # Get the model name from a device activation record
 def getmodel(activation):
 return activation.find('model').text 
 
 filename=hdfs://localhost/user/training/activations/*.xml
 
 # parse each partition as a file into an activation XML record
 activations = sc.textFile(filename)
 activationTrees = activations.mapPartitions(lambda xml: getactivations(xml))
 models = activationTrees.map(lambda activation: getmodel(activation))
 
 # count and sort activations by model
 topmodels = models.map(lambda model: (model,1))\
 .reduceByKey(lambda v1,v2: v1+v2)\