[jira] [Comment Edited] (SPARK-4550) In sort-based shuffle, store map outputs in serialized form

2015-02-08 Thread Sandy Ryza (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310130#comment-14310130
 ] 

Sandy Ryza edited comment on SPARK-4550 at 2/8/15 9:07 PM:
---

I got a working prototype and benchmarked the ExternalSorter changes on my 
laptop.

Each run inserts a bunch of records, each a (Int, (10-character string, Int)) 
tuple, into an ExternalSorter and then calls writePartitionedFile.  The 
reported memory size is the sum of the shuffle bytes spilled (mem) metric and 
the remaining size of the collection after insertion has completed.  Results 
are averaged over three runs.

Keep in mind that the primary goal here is to reduce GC pressure, so any speed 
improvements are icing.

||Number of Records||Storing as Serialized||Memory Size||Number of 
Spills||Insert Time (ms)||Write Time (ms)||Total Time||
|1 million|false|194923217|0|1123|3442|4566|
|1 million|true|48694072|0|1315|2652|3967|
|10 million|false|2050514159|3|26723|17418|44141|
|10 million|true|613614392|1|16501|17151|33652|
|50 million|false|10166122563|17|101831|89960|191791|
|50 million|true|3067937592|5|76801|78361|155161|


was (Author: sandyr):
I got a working prototype and benchmarked the ExternalSorter changes on my 
laptop.

Each run inserts a bunch of records, each a (Int, (10-character string, Int)) 
tuple, into an ExternalSorter and then calls writePartitionedFile.  The 
reported memory size is the sum of the shuffle bytes spilled (mem) metric and 
the remaining size of the collection after insertion has completed.  Results 
are averaged over three runs.

Keep in mind that the primary goal here is to reduce GC pressure, so any speed 
improvements are icing.

||Number of Records||Storing as Serialized||Memory Size||Number of 
Spills||Insert Time (ms)||Write Time (ms)||Total Time||
|1 million|false|194923217|0|1123|3442|4566|
|1 million|true|48694072|0|1315|2652|3967|
|10 million|false|2050514159|3|26723|17418|44141|
|10 million|true|613614392|1|16501|17151|33652|
|10 million|false|10166122563|17|101831|89960|191791|
|10 million|true|3067937592|5|76801|78361|155161|

 In sort-based shuffle, store map outputs in serialized form
 ---

 Key: SPARK-4550
 URL: https://issues.apache.org/jira/browse/SPARK-4550
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle, Spark Core
Affects Versions: 1.2.0
Reporter: Sandy Ryza
Assignee: Sandy Ryza
Priority: Critical
 Attachments: SPARK-4550-design-v1.pdf


 One drawback with sort-based shuffle compared to hash-based shuffle is that 
 it ends up storing many more java objects in memory.  If Spark could store 
 map outputs in serialized form, it could
 * spill less often because the serialized form is more compact
 * reduce GC pressure
 This will only work when the serialized representations of objects are 
 independent from each other and occupy contiguous segments of memory.  E.g. 
 when Kryo reference tracking is left on, objects may contain pointers to 
 objects farther back in the stream, which means that the sort can't relocate 
 objects without corrupting them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4550) In sort-based shuffle, store map outputs in serialized form

2015-02-06 Thread Sandy Ryza (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310130#comment-14310130
 ] 

Sandy Ryza edited comment on SPARK-4550 at 2/6/15 11:13 PM:


I got a working prototype and benchmarked the ExternalSorter changes on my 
laptop.

Each run inserts a bunch of records, each a (Int, (10-character string, Int)) 
tuple, into an ExternalSorter and then calls writePartitionedFile.  The 
reported memory size is the sum of the shuffle bytes spilled (mem) metric and 
the remaining size of the collection after insertion has completed.  Results 
are averaged over three runs.

Keep in mind that the primary goal here is to reduce GC pressure, so any speed 
improvements are icing.

||Number of records||Storing as Serialized||Memory Size||Number of 
Spills||Insert Time (ms)||Write Time (ms)||Total Time||
|1 million|false|194923217|0|1123|3442|4566|
|1 million|true|48694072|0|1315|2652|3967|
|10 million|false|2050514159|3|26723|17418|44141|
|10 million|true|613614392|1|16501|17151|33652|
|10 million|false|10166122563|17|101831|89960|191791|
|10 million|true|3067937592|5|76801|78361|155161|


was (Author: sandyr):
I got a working prototype and benchmarked the ExternalSorter changes on my 
laptop.

Each run inserts a bunch of records, each a (Int, (10-character string, Int)) 
tuple, into an ExternalSorter and then calls writePartitionedFile.  The 
reported memory size is the sum of the shuffle bytes spilled (mem) metric and 
the remaining size of the collection after insertion has completed.  Results 
are averaged over three runs.

Keep in mind that the primary goal here is to reduce GC pressure, so any speed 
improvements are icing.

||Number of records||Storing as Serialized||Memory Size||Number of 
Spills||Insert Time(ms)||Write Time (ms)||Total Time||
|1 million|false|194923217|0|1123|3442|4566|
|1 million|true|48694072|0|1315|2652|3967|
|10 million|false|2050514159|3|26723|17418|44141|
|10 million|true|613614392|1|16501|17151|33652|
|10 million|false|10166122563|17|101831|89960|191791|
|10 million|true|3067937592|5|76801|78361|155161|

 In sort-based shuffle, store map outputs in serialized form
 ---

 Key: SPARK-4550
 URL: https://issues.apache.org/jira/browse/SPARK-4550
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle, Spark Core
Affects Versions: 1.2.0
Reporter: Sandy Ryza
Assignee: Sandy Ryza
Priority: Critical
 Attachments: SPARK-4550-design-v1.pdf


 One drawback with sort-based shuffle compared to hash-based shuffle is that 
 it ends up storing many more java objects in memory.  If Spark could store 
 map outputs in serialized form, it could
 * spill less often because the serialized form is more compact
 * reduce GC pressure
 This will only work when the serialized representations of objects are 
 independent from each other and occupy contiguous segments of memory.  E.g. 
 when Kryo reference tracking is left on, objects may contain pointers to 
 objects farther back in the stream, which means that the sort can't relocate 
 objects without corrupting them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4550) In sort-based shuffle, store map outputs in serialized form

2015-02-06 Thread Sandy Ryza (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310130#comment-14310130
 ] 

Sandy Ryza edited comment on SPARK-4550 at 2/6/15 11:13 PM:


I got a working prototype and benchmarked the ExternalSorter changes on my 
laptop.

Each run inserts a bunch of records, each a (Int, (10-character string, Int)) 
tuple, into an ExternalSorter and then calls writePartitionedFile.  The 
reported memory size is the sum of the shuffle bytes spilled (mem) metric and 
the remaining size of the collection after insertion has completed.  Results 
are averaged over three runs.

Keep in mind that the primary goal here is to reduce GC pressure, so any speed 
improvements are icing.

||Number of Records||Storing as Serialized||Memory Size||Number of 
Spills||Insert Time (ms)||Write Time (ms)||Total Time||
|1 million|false|194923217|0|1123|3442|4566|
|1 million|true|48694072|0|1315|2652|3967|
|10 million|false|2050514159|3|26723|17418|44141|
|10 million|true|613614392|1|16501|17151|33652|
|10 million|false|10166122563|17|101831|89960|191791|
|10 million|true|3067937592|5|76801|78361|155161|


was (Author: sandyr):
I got a working prototype and benchmarked the ExternalSorter changes on my 
laptop.

Each run inserts a bunch of records, each a (Int, (10-character string, Int)) 
tuple, into an ExternalSorter and then calls writePartitionedFile.  The 
reported memory size is the sum of the shuffle bytes spilled (mem) metric and 
the remaining size of the collection after insertion has completed.  Results 
are averaged over three runs.

Keep in mind that the primary goal here is to reduce GC pressure, so any speed 
improvements are icing.

||Number of records||Storing as Serialized||Memory Size||Number of 
Spills||Insert Time (ms)||Write Time (ms)||Total Time||
|1 million|false|194923217|0|1123|3442|4566|
|1 million|true|48694072|0|1315|2652|3967|
|10 million|false|2050514159|3|26723|17418|44141|
|10 million|true|613614392|1|16501|17151|33652|
|10 million|false|10166122563|17|101831|89960|191791|
|10 million|true|3067937592|5|76801|78361|155161|

 In sort-based shuffle, store map outputs in serialized form
 ---

 Key: SPARK-4550
 URL: https://issues.apache.org/jira/browse/SPARK-4550
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle, Spark Core
Affects Versions: 1.2.0
Reporter: Sandy Ryza
Assignee: Sandy Ryza
Priority: Critical
 Attachments: SPARK-4550-design-v1.pdf


 One drawback with sort-based shuffle compared to hash-based shuffle is that 
 it ends up storing many more java objects in memory.  If Spark could store 
 map outputs in serialized form, it could
 * spill less often because the serialized form is more compact
 * reduce GC pressure
 This will only work when the serialized representations of objects are 
 independent from each other and occupy contiguous segments of memory.  E.g. 
 when Kryo reference tracking is left on, objects may contain pointers to 
 objects farther back in the stream, which means that the sort can't relocate 
 objects without corrupting them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4550) In sort-based shuffle, store map outputs in serialized form

2015-02-06 Thread Sandy Ryza (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310130#comment-14310130
 ] 

Sandy Ryza edited comment on SPARK-4550 at 2/6/15 11:08 PM:


I got a working prototype and benchmarked the ExternalSorter changes on my 
laptop.

Each run inserts a bunch of records, each a (Int, (10-character string, Int)) 
tuple, into an ExternalSorter and then calls writePartitionedFile.  The 
reported memory size is the sum of the shuffle bytes spilled (mem) metric and 
the remaining size of the collection after insertion has completed.  Results 
are averaged over three runs.

Keep in mind that the primary goal here is to reduce GC pressure, so any speed 
improvements are icing.

||Number of records||Storing as Serialized||Memory Size||Number of 
Spills||Insert Time(ms)||Write Time (ms)||Total Time||
|1 million|false|194923217|0|1123|3442|4566|
|1 million|true|48694072|0|1315|2652|3967|
|10 million|false|2050514159|3|26723|17418|44141|
|10 million|true|613614392|1|16501|17151|33652|
|10 million|false|10166122563|17|101831|89960|191791|
|10 million|true|3067937592|5|76801|78361|155161|


was (Author: sandyr):
I got a working prototype and benchmarked the ExternalSorter changes on my 
laptop.

Each run inserts a bunch of records, each a (Int, (10-character string, Int)) 
tuple, into an ExternalSorter and then calls writePartitionedFile.  The 
reported memory size is the sum of the shuffle bytes spilled (mem) metric and 
the remaining size of the collection after insertion has completed.  Results 
are averaged over three runs.

Keep in mind that the primary goal here is to reduce GC pressure, so any speed 
improvements are icing.

||Number of records||Storing as Serialized||Memory Size||Number of 
Spills||Insert Time(ms)||Write Time (ms)||Total Time||
|1 million|false|194923217|0|1123|3442|4566|
|1 million|true|48694072|0|1315|2652|3967|
|10 million|true|2050514159|3|26723|17418|44141|
|10 million|false|613614392|1|16501|17151|33652|
|10 million|true|10166122563|17|101831|89960|191791|
|10 million|false|3067937592|5|76801|78361|155161|

 In sort-based shuffle, store map outputs in serialized form
 ---

 Key: SPARK-4550
 URL: https://issues.apache.org/jira/browse/SPARK-4550
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle, Spark Core
Affects Versions: 1.2.0
Reporter: Sandy Ryza
Assignee: Sandy Ryza
Priority: Critical
 Attachments: SPARK-4550-design-v1.pdf


 One drawback with sort-based shuffle compared to hash-based shuffle is that 
 it ends up storing many more java objects in memory.  If Spark could store 
 map outputs in serialized form, it could
 * spill less often because the serialized form is more compact
 * reduce GC pressure
 This will only work when the serialized representations of objects are 
 independent from each other and occupy contiguous segments of memory.  E.g. 
 when Kryo reference tracking is left on, objects may contain pointers to 
 objects farther back in the stream, which means that the sort can't relocate 
 objects without corrupting them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4550) In sort-based shuffle, store map outputs in serialized form

2015-02-04 Thread Sandy Ryza (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14304864#comment-14304864
 ] 

Sandy Ryza edited comment on SPARK-4550 at 2/5/15 12:36 AM:


I had heard rumors to that effect, so I ran some experiments and didn't find 
that to be the case:

{code}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.SparkConf
import java.io.ByteArrayOutputStream
import java.nio.ByteBuffer

val ser1 = new KryoSerializer(new SparkConf)

def serialize(objs: Array[AnyRef], ser: KryoSerializer): Array[Byte] = {
  val instance = ser.newInstance
  val baos = new ByteArrayOutputStream()
  val stream = instance.serializeStream(baos)
  objs.foreach(obj = stream.writeObject(obj))
  stream.close()
  baos.toByteArray
}

val inner = (0 until 10).toArray
val bytes1 = serialize(Array((1, inner), (2, inner)), ser1)

val inner1 = (0 until 10).toArray
val inner2 = (0 until 10).toArray
val bytes2 = serialize(Array((1, inner1), (2, inner2)), ser1)

val secondHalf = new Array[Byte](bytes1.size / 2)
System.arraycopy(bytes1, bytes1.size / 2, secondHalf, 0, bytes1.size / 2)

ser1.newInstance.deserialize[AnyRef](ByteBuffer.wrap(secondHalf))
{code}

A couple observations:
* bytes1 ends up the same size as bytes2, implying that inner is not 
being reference-tracked between the two writeObject calls
* The last line is able to successfully reproduce the second object, implying 
that there's no information written at the beginning of the stream needed to 
deserialize objects later down.

Are there cases or Kryo versions I'm not thinking about?


was (Author: sandyr):
I had heard rumors to that effect, so I ran some experiments and didn't find 
that to be the case:

{code}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.SparkConf
import java.io.ByteArrayOutputStream
import java.nio.ByteBuffer

val ser1 = new KryoSerializer(new SparkConf)

def serialize(objs: Array[AnyRef], ser: KryoSerializer): Array[Byte] = {
  val instance = ser.newInstance
  val baos = new ByteArrayOutputStream()
  val stream = instance.serializeStream(baos)
  objs.foreach(obj = stream.writeObject(obj))
  stream.close()
  baos.toByteArray
}

val inner = (0 until 10).toArray
val bytes1 = serialize(Array((1, inner), (2, inner)), ser1)

val inner1 = (0 until 10).toArray
val inner2 = (0 until 10).toArray
val bytes2 = serialize(Array((1, inner1), (2, inner2)), ser1)

val secondHalf = new Array[Byte](bytes1.size / 2)
System.arraycopy(bytes1, bytes1.size / 2, secondHalf, 0, bytes1.size / 2)

ser1.newInstance.deserialize[AnyRef](ByteBuffer.wrap(secondHalf))
{code}

bytes1 ends up the same size as bytes2, and the last line is able to 
successfully reproduce the second object.

Are there cases or Kryo versions I'm not thinking about?

 In sort-based shuffle, store map outputs in serialized form
 ---

 Key: SPARK-4550
 URL: https://issues.apache.org/jira/browse/SPARK-4550
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle, Spark Core
Affects Versions: 1.2.0
Reporter: Sandy Ryza
Assignee: Sandy Ryza
Priority: Critical
 Attachments: SPARK-4550-design-v1.pdf


 One drawback with sort-based shuffle compared to hash-based shuffle is that 
 it ends up storing many more java objects in memory.  If Spark could store 
 map outputs in serialized form, it could
 * spill less often because the serialized form is more compact
 * reduce GC pressure
 This will only work when the serialized representations of objects are 
 independent from each other and occupy contiguous segments of memory.  E.g. 
 when Kryo reference tracking is left on, objects may contain pointers to 
 objects farther back in the stream, which means that the sort can't relocate 
 objects without corrupting them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org