I've been trying to understand the performance of Datasets (and filters) in 
Spark 2.0. 

I have a Dataset which I've read from a parquet file and cached into memory 
(deser).  This is spread across 8 partitions and consumes a total of 826MB of 
memory on my cluster.  I verified that the dataset was 100% cached in memory by 
looking at the Spark UI.

I'm using an AWS c3.2xlarge for my 1 worker (8 cores).

There are 108,587,678 total records in my cached dataset (om).

I run the following command (against this cached Dataset) and it takes 13.56s.

om.filter(textAnnotation => textAnnotation.annotType == "ce:para").count

This returns a count of 1,039,993

When I look at the explain() for this query, I see the following:

== Physical Plan ==

*Filter <function1>.apply+- InMemoryTableScan [docId#394, annotSet#395, 
annotType#396, startOffset#397L, endOffset#398L, annotId#399L, parentId#400L, 
orig#401, lemma#402, pos#403, xmlId#404], [<function1>.apply]
+- InMemoryRelation [docId#394, annotSet#395, annotType#396, startOffset#397L, 
endOffset#398L, annotId#399L, parentId#400L, orig#401, lemma#402, pos#403, 
xmlId#404], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- Exchange hashpartitioning(docId#394, 8)

...
I was a bit perplexed why this takes so long as I had read that Spark could 
filter 1B rows a second on a single cpu.  Granted, my row is likely more 
complex but I thought it should be faster than 13+ seconds to read in 100M rows 
that had been cached into memory.

So, I modified the above query to the following:

om.filter("annotType == 'ce:para'").count

The query now completes in just over 1s (a huge improvement).

When I do the explain plan for this query, I see the following:

== Physical Plan ==
*Filter (isnotnull(annotType#396) && (annotType#396 = ce:para))
+- InMemoryTableScan [docId#394, annotSet#395, annotType#396, startOffset#397L, 
endOffset#398L, annotId#399L, parentId#400L, orig#401, lemma#402, pos#403, 
xmlId#404], [isnotnull(annotType#396), (annotType#396 = ce:para)]
+- InMemoryRelation [docId#394, annotSet#395, annotType#396, startOffset#397L, 
endOffset#398L, annotId#399L, parentId#400L, orig#401, lemma#402, pos#403, 
xmlId#404], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- Exchange hashpartitioning(docId#394, 8)

This is very similar to the first with the notable exception of 

*Filter (isnotnull(annotType#396) && (annotType#396 = ce:para))  
instead of

*Filter <function1>.apply

I'm guessing the improved performance is because the object TextAnnotation must 
be created in the first example (and not the second).  Although, this is not 
clear from the explain plans.  Is that correct?  Or is there some other reason 
why the second approach is significantly faster?  I would really like to get a 
solid understanding for why the performance of the second query is so much 
faster.

I also want to clarify whether the InMemoryTableScan and inMemoryRelation are 
part of the whole-stage code generation.  I'm thinking they aren't as they 
aren't prefixed by a "*".  If not, is there something I could do to make take 
this part of whole-stage code generation?

My goal is to make the above operation as fast as possible.  I could of course 
increase the partitions (and the size of my cluster) but I also want to clarify 
my understanding of whole-stage code generation. 

Any thought/suggestions would be appreciated.  Also, if anyone has found good 
resources that further explain the details of the DAG and whole-stage code 
generation, I would appreciate those as well.

Thanks.

Darin.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to