[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-25 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-50115353
  
Running tests with 
export 
SPARK_JAVA_OPTS=-Dspark.shuffle.manager=org.apache.spark.shuffle.sort.SortShuffleManager
 causes :
'''
- sorting using mutable pairs *** FAILED ***
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 
3.0:1 failed 4 times, most recent failure: Exception failure in TID 14 on host 
localhost: java.lang.ArrayStoreException: scala.Tuple2
scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)
scala.Array$.slowcopy(Array.scala:81)
scala.Array$.copy(Array.scala:107)

scala.collection.mutable.ResizableArray$class.copyToArray(ResizableArray.scala:77)

scala.collection.mutable.ArrayBuffer.copyToArray(ArrayBuffer.scala:47)

scala.collection.TraversableOnce$class.copyToArray(TraversableOnce.scala:241)

scala.collection.AbstractTraversable.copyToArray(Traversable.scala:105)

scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:249)
scala.collection.AbstractTraversable.toArray(Traversable.scala:105)

scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)

org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)

org.apache.spark.rdd.OrderedRDDFunctions$$anonfun$sortByKey$1.apply(OrderedRDDFunctions.scala:62)

org.apache.spark.rdd.OrderedRDDFunctions$$anonfun$sortByKey$1.apply(OrderedRDDFunctions.scala:61)
org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:581)
org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:581)

org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:112)
org.apache.spark.scheduler.Task.run(Task.scala:51)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:199)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
  at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1055)
  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1039)
  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1037)
  at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1037)
  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:641)
  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:641)
  at scala.Option.foreach(Option.scala:236)
  at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:641)
  ...
'''

The actual classes are :
'''
classes = [class scala.Tuple2] ; 
[class scala.Tuple2,
class scala.Tuple2,
class scala.Tuple2]
'''
obtained using :
'''
  def collect(): Array[T] = {
println(classes =  + sc.runJob(this, (iter: Iterator[T]) =
  iter.map(v = if (v == null) null else v.getClass).mkString([, 
,\n\t\t, ])).
  mkString(,  ; \n\t, ))
System.out.flush()
val results = sc.runJob(this, (iter: Iterator[T]) = iter.toArray)
''' in RDD.collect


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-25 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-50115453
  
BTW, this is one of 5 failures from core.
I hope there are no merge issues though,


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-25 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-50115862
  
@mridulm that should've been fixed recently in 
https://github.com/mateiz/spark/commit/9c299579f13f004f5fd1f4dd0b98b7d76cac2a55,
 which got rid of custom return types in ShuffledRDD for this reason (see the 
message there). Maybe you need to do a clean rebuild. In an earlier commit I 
also added a version of ShuffleSuite that runs all those tests with sort-based 
shuffle.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-25 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15388684
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.io.{DataInputStream, FileInputStream}
+
+import org.apache.spark.shuffle._
+import org.apache.spark.{TaskContext, ShuffleDependency}
+import org.apache.spark.shuffle.hash.HashShuffleReader
+import org.apache.spark.storage.{DiskBlockManager, FileSegment, 
ShuffleBlockId}
+
+private[spark] class SortShuffleManager extends ShuffleManager {
+  /**
+   * Register a shuffle with the manager and obtain a handle for it to 
pass to tasks.
+   */
+  override def registerShuffle[K, V, C](
+  shuffleId: Int,
+  numMaps: Int,
+  dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
+new BaseShuffleHandle(shuffleId, numMaps, dependency)
+  }
+
+  /**
+   * Get a reader for a range of reduce partitions (startPartition to 
endPartition-1, inclusive).
+   * Called on executors by reduce tasks.
+   */
+  override def getReader[K, C](
+  handle: ShuffleHandle,
+  startPartition: Int,
+  endPartition: Int,
+  context: TaskContext): ShuffleReader[K, C] = {
+// We currently use the same block store shuffle fetcher as the 
hash-based shuffle.
+new HashShuffleReader(
+  handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, 
endPartition, context)
+  }
+
+  /** Get a writer for a given partition. Called on executors by map 
tasks. */
+  override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: 
TaskContext)
+  : ShuffleWriter[K, V] = {
+new SortShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], 
mapId, context)
+  }
+
+  /** Remove a shuffle's metadata from the ShuffleManager. */
+  override def unregisterShuffle(shuffleId: Int): Unit = {}
--- End diff --

Alright, fixed this in the latest commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-25 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-50116492
  
ah, thanks ! rerunning with 9c29957.
cant pull the pr - and manual merge is painful, hence delays in testing :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-25 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-50159041
  
Not sure if it helps, but you can get PRs via something like `git fetch
apache refs/pull/1499/head`. Easier workflow, perhaps.
On Jul 25, 2014 12:25 AM, Mridul Muralidharan notificati...@github.com
wrote:

 ah, thanks ! rerunning with 9c29957
 https://github.com/apache/spark/commit/9c29957.
 cant pull the pr - and manual merge is painful, hence delays in testing 
:-)

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/1499#issuecomment-50116492.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-25 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-50160338
  
After installing `hub` you can also do a bunch of new stuff on the command 
line, including `hub checkout https://github.com/apache/spark/pull/1499`

https://hub.github.com/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-25 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-50210371
  
QA tests have started for PR 1499. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17202/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-25 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-50213171
  
QA results for PR 1499:br- This patch FAILED unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brclass ShuffledRDD[K, V, C](brcase class 
ShuffleIndexBlockId(shuffleId: Int, mapId: Int, reduceId: Int)brbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17202/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-24 Thread colorant
Github user colorant commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15331341
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.io.{DataInputStream, FileInputStream}
+
+import org.apache.spark.shuffle._
+import org.apache.spark.{TaskContext, ShuffleDependency}
+import org.apache.spark.shuffle.hash.HashShuffleReader
+import org.apache.spark.storage.{DiskBlockManager, FileSegment, 
ShuffleBlockId}
+
+private[spark] class SortShuffleManager extends ShuffleManager {
+  /**
+   * Register a shuffle with the manager and obtain a handle for it to 
pass to tasks.
+   */
+  override def registerShuffle[K, V, C](
+  shuffleId: Int,
+  numMaps: Int,
+  dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
+new BaseShuffleHandle(shuffleId, numMaps, dependency)
+  }
+
+  /**
+   * Get a reader for a range of reduce partitions (startPartition to 
endPartition-1, inclusive).
+   * Called on executors by reduce tasks.
+   */
+  override def getReader[K, C](
+  handle: ShuffleHandle,
+  startPartition: Int,
+  endPartition: Int,
+  context: TaskContext): ShuffleReader[K, C] = {
+// We currently use the same block store shuffle fetcher as the 
hash-based shuffle.
+new HashShuffleReader(
+  handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, 
endPartition, context)
+  }
+
+  /** Get a writer for a given partition. Called on executors by map 
tasks. */
+  override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: 
TaskContext)
+  : ShuffleWriter[K, V] = {
+new SortShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], 
mapId, context)
+  }
+
+  /** Remove a shuffle's metadata from the ShuffleManager. */
+  override def unregisterShuffle(shuffleId: Int): Unit = {}
--- End diff --

shuffle output file in sortShuffleWritter do not get cleaned. We might need 
to add map to save registered shuffle handle, Then try to remove the data file 
in unregisterShuffle method. Though at present, in HashShuffleManager, this is 
also not implemented. But HashShuffleManager depends on shuffleBlockManager and 
the file will be cleaned there.

I have a PR to generalize shuffleBlockManager and hide it behind 
shuffleMananger. as in #1241 , and upon blockMananger do remove shuffle, will 
call into this unregisterShuffle method.  Will rebase upon this PR been merged.

Should we fix this issue in my PR, or add the store/clean shuffleHandle 
logic here in this PR firstly?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-24 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15332387
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.io.{DataInputStream, FileInputStream}
+
+import org.apache.spark.shuffle._
+import org.apache.spark.{TaskContext, ShuffleDependency}
+import org.apache.spark.shuffle.hash.HashShuffleReader
+import org.apache.spark.storage.{DiskBlockManager, FileSegment, 
ShuffleBlockId}
+
+private[spark] class SortShuffleManager extends ShuffleManager {
+  /**
+   * Register a shuffle with the manager and obtain a handle for it to 
pass to tasks.
+   */
+  override def registerShuffle[K, V, C](
+  shuffleId: Int,
+  numMaps: Int,
+  dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
+new BaseShuffleHandle(shuffleId, numMaps, dependency)
+  }
+
+  /**
+   * Get a reader for a range of reduce partitions (startPartition to 
endPartition-1, inclusive).
+   * Called on executors by reduce tasks.
+   */
+  override def getReader[K, C](
+  handle: ShuffleHandle,
+  startPartition: Int,
+  endPartition: Int,
+  context: TaskContext): ShuffleReader[K, C] = {
+// We currently use the same block store shuffle fetcher as the 
hash-based shuffle.
+new HashShuffleReader(
+  handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, 
endPartition, context)
+  }
+
+  /** Get a writer for a given partition. Called on executors by map 
tasks. */
+  override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: 
TaskContext)
+  : ShuffleWriter[K, V] = {
+new SortShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], 
mapId, context)
+  }
+
+  /** Remove a shuffle's metadata from the ShuffleManager. */
+  override def unregisterShuffle(shuffleId: Int): Unit = {}
--- End diff --

Good question, but files on disk actually get cleaned by BlockManager 
directly if cleaning is turned on, and sort-based shuffle is set up to only use 
files. ShuffleBlockManager had extra state in memory in the form of metadata, 
that's why it needs its own cleaner. But I don't think there's an issue here.

It will be important to do something for unregisterShuffle, but right now 
nothing calls that. That's part of the API because I wanted to move the 
MapOutputTracker behind the ShuffleManager as well. But any patches to move 
more stuff behind it are welcome, and I agree the ShuffleBlockManager should be 
specific to hash-based shuffle.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-24 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49973755
  
Added one more commit that fixes the type of ShuffledRDD, because in this 
new shuffle it's not possible to return a custom Product2 the way it's written 
now, and in the old one it wasn't possible anyway with aggregation. See 
https://github.com/mateiz/spark/commit/9c299579f13f004f5fd1f4dd0b98b7d76cac2a55 
for details. @rxin you should look at this one.

With this commit there's now a version of ShuffleSuite that passes using 
SortBasedShuffle.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-24 Thread colorant
Github user colorant commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15333148
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.io.{DataInputStream, FileInputStream}
+
+import org.apache.spark.shuffle._
+import org.apache.spark.{TaskContext, ShuffleDependency}
+import org.apache.spark.shuffle.hash.HashShuffleReader
+import org.apache.spark.storage.{DiskBlockManager, FileSegment, 
ShuffleBlockId}
+
+private[spark] class SortShuffleManager extends ShuffleManager {
+  /**
+   * Register a shuffle with the manager and obtain a handle for it to 
pass to tasks.
+   */
+  override def registerShuffle[K, V, C](
+  shuffleId: Int,
+  numMaps: Int,
+  dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
+new BaseShuffleHandle(shuffleId, numMaps, dependency)
+  }
+
+  /**
+   * Get a reader for a range of reduce partitions (startPartition to 
endPartition-1, inclusive).
+   * Called on executors by reduce tasks.
+   */
+  override def getReader[K, C](
+  handle: ShuffleHandle,
+  startPartition: Int,
+  endPartition: Int,
+  context: TaskContext): ShuffleReader[K, C] = {
+// We currently use the same block store shuffle fetcher as the 
hash-based shuffle.
+new HashShuffleReader(
+  handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, 
endPartition, context)
+  }
+
+  /** Get a writer for a given partition. Called on executors by map 
tasks. */
+  override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: 
TaskContext)
+  : ShuffleWriter[K, V] = {
+new SortShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], 
mapId, context)
+  }
+
+  /** Remove a shuffle's metadata from the ShuffleManager. */
+  override def unregisterShuffle(shuffleId: Int): Unit = {}
--- End diff --

Maybe I miss some code? But from what I understanding, when the cleaning is 
turned on, it seems to me, if by timestamp approaching, blockManager won't 
remove shuffle data,ShuffleBlockManager will do the work by itself. And if by 
auto clean approaching when doCleanShuffle is called, it go through the current 
ShuffleBlockManager interface. In neither case, these sortShuffleWritter 
generated file will be cleaned? Or do you mean, as long as there are no 
metadata in memory, shuffle file on disk is ok to not be removed until 
application exit?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-24 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15382131
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.io.{DataInputStream, FileInputStream}
+
+import org.apache.spark.shuffle._
+import org.apache.spark.{TaskContext, ShuffleDependency}
+import org.apache.spark.shuffle.hash.HashShuffleReader
+import org.apache.spark.storage.{DiskBlockManager, FileSegment, 
ShuffleBlockId}
+
+private[spark] class SortShuffleManager extends ShuffleManager {
+  /**
+   * Register a shuffle with the manager and obtain a handle for it to 
pass to tasks.
+   */
+  override def registerShuffle[K, V, C](
+  shuffleId: Int,
+  numMaps: Int,
+  dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
+new BaseShuffleHandle(shuffleId, numMaps, dependency)
+  }
+
+  /**
+   * Get a reader for a range of reduce partitions (startPartition to 
endPartition-1, inclusive).
+   * Called on executors by reduce tasks.
+   */
+  override def getReader[K, C](
+  handle: ShuffleHandle,
+  startPartition: Int,
+  endPartition: Int,
+  context: TaskContext): ShuffleReader[K, C] = {
+// We currently use the same block store shuffle fetcher as the 
hash-based shuffle.
+new HashShuffleReader(
+  handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, 
endPartition, context)
+  }
+
+  /** Get a writer for a given partition. Called on executors by map 
tasks. */
+  override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: 
TaskContext)
+  : ShuffleWriter[K, V] = {
+new SortShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], 
mapId, context)
+  }
+
+  /** Remove a shuffle's metadata from the ShuffleManager. */
+  override def unregisterShuffle(shuffleId: Int): Unit = {}
--- End diff --

Ah, I see, these files are never registered with the BlockManager so they 
won't be cleaned by it. I'll modify ShuffleBlockManager to clean them for now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-24 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15383498
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.io.{BufferedOutputStream, File, FileOutputStream, 
DataOutputStream}
+
+import org.apache.spark.{MapOutputTracker, SparkEnv, Logging, TaskContext}
+import org.apache.spark.executor.ShuffleWriteMetrics
+import org.apache.spark.scheduler.MapStatus
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.shuffle.{ShuffleWriter, BaseShuffleHandle}
+import org.apache.spark.storage.ShuffleBlockId
+import org.apache.spark.util.collection.ExternalSorter
+
+private[spark] class SortShuffleWriter[K, V, C](
+handle: BaseShuffleHandle[K, V, C],
+mapId: Int,
+context: TaskContext)
+  extends ShuffleWriter[K, V] with Logging {
+
+  private val dep = handle.dependency
+  private val numPartitions = dep.partitioner.numPartitions
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val ser = 
Serializer.getSerializer(dep.serializer.getOrElse(null))
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+
+  private var sorter: ExternalSorter[K, V, _] = null
+  private var outputFile: File = null
+
+  private var stopping = false
+  private var mapStatus: MapStatus = null
+
+  /** Write a bunch of records to this task's output */
+  override def write(records: Iterator[_ : Product2[K, V]]): Unit = {
+val partitions: Iterator[(Int, Iterator[Product2[K, _]])] = {
+  if (dep.mapSideCombine) {
+if (!dep.aggregator.isDefined) {
+  throw new IllegalStateException(Aggregator is empty for 
map-side combine)
+}
+sorter = new ExternalSorter[K, V, C](
+  dep.aggregator, Some(dep.partitioner), dep.keyOrdering, 
dep.serializer)
+sorter.write(records)
+sorter.partitionedIterator
+  } else {
+// In this case we pass neither an aggregator nor an ordering to 
the sorter, because we
+// don't care whether the keys get sorted in each partition; that 
will be done on the
+// reduce side if the operation being run is sortByKey.
+sorter = new ExternalSorter[K, V, V](
+  None, Some(dep.partitioner), None, dep.serializer)
+sorter.write(records)
+sorter.partitionedIterator
+  }
+}
+
+// Create a single shuffle file with reduce ID 0 that we'll write all 
results to. We'll later
+// serve different ranges of this file using an index file that we 
create at the end.
+val blockId = ShuffleBlockId(dep.shuffleId, mapId, 0)
+outputFile = blockManager.diskBlockManager.getFile(blockId)
+
+// Track location of each range in the output file
+val offsets = new Array[Long](numPartitions + 1)
+val lengths = new Array[Long](numPartitions)
+
+// Statistics
+var totalBytes = 0L
+var totalTime = 0L
+
+for ((id, elements) - partitions) {
+  if (elements.hasNext) {
+val writer = blockManager.getDiskWriter(blockId, outputFile, ser, 
fileBufferSize)
+for (elem - elements) {
+  writer.write(elem)
+}
+writer.commit()
+writer.close()
+val segment = writer.fileSegment()
+offsets(id + 1) = segment.offset + segment.length
+lengths(id) = segment.length
+totalTime += writer.timeWriting()
+totalBytes += segment.length
+  } else {
+// Don't create a new writer to avoid writing any headers and 
things like that
+offsets(id + 1) = offsets(id)
+  }
+}
+
+val 

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15274240
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,649 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional Partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional Ordering to sort keys within each partition; 
should be a total ordering
+ * @param serializer serializer to use when spilling to disk
+ *
+ * Note that if an Ordering is given, we'll always sort using it, so only 
provide it if you really
+ * want the output keys to be sorted. In a map task without map-side 
combine for example, you
+ * probably want to pass None as the ordering to avoid extra sorting. On 
the other hand, if you do
+ * want to do combining, having an Ordering is more efficient than not 
having it.
+ *
+ * At a high level, this class works as follows:
+ *
+ * - We repeatedly fill up buffers of in-memory data, using either a 
SizeTrackingAppendOnlyMap if
+ *   we want to combine by key, or an simple SizeTrackingBuffer if we 
don't. Inside these buffers,
+ *   we sort elements of type ((Int, K), C) where the Int is the partition 
ID. This is done to
+ *   avoid calling the partitioner multiple times on the same key (e.g. 
for RangePartitioner).
+ *
+ * - When each buffer reaches our memory limit, we spill it to a file. 
This file is sorted first
+ *   by partition ID and possibly second by key or by hash code of the 
key, if we want to do
+ *   aggregation. For each file, we track how many objects were in each 
partition in memory, so we
+ *   don't have to write out the partition ID for every element.
+ *
+ * - When the user requests an iterator, the spilled files are merged, 
along with any remaining
+ *   in-memory data, using the same sort order defined above (unless both 
sorting and aggregation
+ *   are disabled). If we need to aggregate by key, we either use a total 
ordering from the
+ *   ordering parameter, or read the keys with the same hash code and 
compare them with each other
+ *   for equality to merge values.
+ *
+ * - Users are expected to call stop() at the end to delete all the 
intermediate files.
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = 

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-23 Thread colorant
Github user colorant commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15276601
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,649 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional Partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional Ordering to sort keys within each partition; 
should be a total ordering
+ * @param serializer serializer to use when spilling to disk
+ *
+ * Note that if an Ordering is given, we'll always sort using it, so only 
provide it if you really
+ * want the output keys to be sorted. In a map task without map-side 
combine for example, you
+ * probably want to pass None as the ordering to avoid extra sorting. On 
the other hand, if you do
+ * want to do combining, having an Ordering is more efficient than not 
having it.
+ *
+ * At a high level, this class works as follows:
+ *
+ * - We repeatedly fill up buffers of in-memory data, using either a 
SizeTrackingAppendOnlyMap if
+ *   we want to combine by key, or an simple SizeTrackingBuffer if we 
don't. Inside these buffers,
+ *   we sort elements of type ((Int, K), C) where the Int is the partition 
ID. This is done to
+ *   avoid calling the partitioner multiple times on the same key (e.g. 
for RangePartitioner).
+ *
+ * - When each buffer reaches our memory limit, we spill it to a file. 
This file is sorted first
+ *   by partition ID and possibly second by key or by hash code of the 
key, if we want to do
+ *   aggregation. For each file, we track how many objects were in each 
partition in memory, so we
+ *   don't have to write out the partition ID for every element.
+ *
+ * - When the user requests an iterator, the spilled files are merged, 
along with any remaining
+ *   in-memory data, using the same sort order defined above (unless both 
sorting and aggregation
+ *   are disabled). If we need to aggregate by key, we either use a total 
ordering from the
+ *   ordering parameter, or read the keys with the same hash code and 
compare them with each other
+ *   for equality to merge values.
+ *
+ * - Users are expected to call stop() at the end to delete all the 
intermediate files.
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = 

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-23 Thread colorant
Github user colorant commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15277089
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,649 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional Partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional Ordering to sort keys within each partition; 
should be a total ordering
+ * @param serializer serializer to use when spilling to disk
+ *
+ * Note that if an Ordering is given, we'll always sort using it, so only 
provide it if you really
+ * want the output keys to be sorted. In a map task without map-side 
combine for example, you
+ * probably want to pass None as the ordering to avoid extra sorting. On 
the other hand, if you do
+ * want to do combining, having an Ordering is more efficient than not 
having it.
+ *
+ * At a high level, this class works as follows:
+ *
+ * - We repeatedly fill up buffers of in-memory data, using either a 
SizeTrackingAppendOnlyMap if
+ *   we want to combine by key, or an simple SizeTrackingBuffer if we 
don't. Inside these buffers,
+ *   we sort elements of type ((Int, K), C) where the Int is the partition 
ID. This is done to
+ *   avoid calling the partitioner multiple times on the same key (e.g. 
for RangePartitioner).
+ *
+ * - When each buffer reaches our memory limit, we spill it to a file. 
This file is sorted first
+ *   by partition ID and possibly second by key or by hash code of the 
key, if we want to do
+ *   aggregation. For each file, we track how many objects were in each 
partition in memory, so we
+ *   don't have to write out the partition ID for every element.
+ *
+ * - When the user requests an iterator, the spilled files are merged, 
along with any remaining
+ *   in-memory data, using the same sort order defined above (unless both 
sorting and aggregation
+ *   are disabled). If we need to aggregate by key, we either use a total 
ordering from the
+ *   ordering parameter, or read the keys with the same hash code and 
compare them with each other
+ *   for equality to merge values.
+ *
+ * - Users are expected to call stop() at the end to delete all the 
intermediate files.
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = 

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15288486
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,649 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional Partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional Ordering to sort keys within each partition; 
should be a total ordering
+ * @param serializer serializer to use when spilling to disk
+ *
+ * Note that if an Ordering is given, we'll always sort using it, so only 
provide it if you really
+ * want the output keys to be sorted. In a map task without map-side 
combine for example, you
+ * probably want to pass None as the ordering to avoid extra sorting. On 
the other hand, if you do
+ * want to do combining, having an Ordering is more efficient than not 
having it.
+ *
+ * At a high level, this class works as follows:
+ *
+ * - We repeatedly fill up buffers of in-memory data, using either a 
SizeTrackingAppendOnlyMap if
+ *   we want to combine by key, or an simple SizeTrackingBuffer if we 
don't. Inside these buffers,
+ *   we sort elements of type ((Int, K), C) where the Int is the partition 
ID. This is done to
+ *   avoid calling the partitioner multiple times on the same key (e.g. 
for RangePartitioner).
+ *
+ * - When each buffer reaches our memory limit, we spill it to a file. 
This file is sorted first
+ *   by partition ID and possibly second by key or by hash code of the 
key, if we want to do
+ *   aggregation. For each file, we track how many objects were in each 
partition in memory, so we
+ *   don't have to write out the partition ID for every element.
+ *
+ * - When the user requests an iterator, the spilled files are merged, 
along with any remaining
+ *   in-memory data, using the same sort order defined above (unless both 
sorting and aggregation
+ *   are disabled). If we need to aggregate by key, we either use a total 
ordering from the
+ *   ordering parameter, or read the keys with the same hash code and 
compare them with each other
+ *   for equality to merge values.
+ *
+ * - Users are expected to call stop() at the end to delete all the 
intermediate files.
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = 

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-23 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15310488
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,649 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional Partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional Ordering to sort keys within each partition; 
should be a total ordering
+ * @param serializer serializer to use when spilling to disk
+ *
+ * Note that if an Ordering is given, we'll always sort using it, so only 
provide it if you really
+ * want the output keys to be sorted. In a map task without map-side 
combine for example, you
+ * probably want to pass None as the ordering to avoid extra sorting. On 
the other hand, if you do
+ * want to do combining, having an Ordering is more efficient than not 
having it.
+ *
+ * At a high level, this class works as follows:
+ *
+ * - We repeatedly fill up buffers of in-memory data, using either a 
SizeTrackingAppendOnlyMap if
+ *   we want to combine by key, or an simple SizeTrackingBuffer if we 
don't. Inside these buffers,
+ *   we sort elements of type ((Int, K), C) where the Int is the partition 
ID. This is done to
+ *   avoid calling the partitioner multiple times on the same key (e.g. 
for RangePartitioner).
+ *
+ * - When each buffer reaches our memory limit, we spill it to a file. 
This file is sorted first
+ *   by partition ID and possibly second by key or by hash code of the 
key, if we want to do
+ *   aggregation. For each file, we track how many objects were in each 
partition in memory, so we
+ *   don't have to write out the partition ID for every element.
+ *
+ * - When the user requests an iterator, the spilled files are merged, 
along with any remaining
+ *   in-memory data, using the same sort order defined above (unless both 
sorting and aggregation
+ *   are disabled). If we need to aggregate by key, we either use a total 
ordering from the
+ *   ordering parameter, or read the keys with the same hash code and 
compare them with each other
+ *   for equality to merge values.
+ *
+ * - Users are expected to call stop() at the end to delete all the 
intermediate files.
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = 

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-23 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15310561
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,649 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional Partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional Ordering to sort keys within each partition; 
should be a total ordering
+ * @param serializer serializer to use when spilling to disk
+ *
+ * Note that if an Ordering is given, we'll always sort using it, so only 
provide it if you really
+ * want the output keys to be sorted. In a map task without map-side 
combine for example, you
+ * probably want to pass None as the ordering to avoid extra sorting. On 
the other hand, if you do
+ * want to do combining, having an Ordering is more efficient than not 
having it.
+ *
+ * At a high level, this class works as follows:
+ *
+ * - We repeatedly fill up buffers of in-memory data, using either a 
SizeTrackingAppendOnlyMap if
+ *   we want to combine by key, or an simple SizeTrackingBuffer if we 
don't. Inside these buffers,
+ *   we sort elements of type ((Int, K), C) where the Int is the partition 
ID. This is done to
+ *   avoid calling the partitioner multiple times on the same key (e.g. 
for RangePartitioner).
+ *
+ * - When each buffer reaches our memory limit, we spill it to a file. 
This file is sorted first
+ *   by partition ID and possibly second by key or by hash code of the 
key, if we want to do
+ *   aggregation. For each file, we track how many objects were in each 
partition in memory, so we
+ *   don't have to write out the partition ID for every element.
+ *
+ * - When the user requests an iterator, the spilled files are merged, 
along with any remaining
+ *   in-memory data, using the same sort order defined above (unless both 
sorting and aggregation
+ *   are disabled). If we need to aggregate by key, we either use a total 
ordering from the
+ *   ordering parameter, or read the keys with the same hash code and 
compare them with each other
+ *   for equality to merge values.
+ *
+ * - Users are expected to call stop() at the end to delete all the 
intermediate files.
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = 

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-23 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49924823
  
@colorant @mridulm for the serializing part, one issue is that as we merge 
streams we already need a heap of O(num streams) objects, so if you're worried 
about the objects being large, deferring the read won't help tremendously. I'm 
hoping to fix that by adding a max merge factor and supporting hierarchical 
merges later, but I think we can do it after this first PR. (If possible I 
would like to get it in 1.1 too, since it's not that much code. But I don't 
want to have a giant PR waiting that blocks other things.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49925284
  
QA tests have started for PR 1499. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17055/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49938524
  
QA results for PR 1499:br- This patch FAILED unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17055/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-23 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49949511
  
@mateiz The total memory overhead actually goes much higher than 
num_streams right ?
It should be order of num_streams + num_values for this key.

For fairly large values, the latter might fit into memory, but the former 
might not (particularly as number of mappers increases).

Or did I get this wrong from the PR ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-23 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49949660
  
No, we only read one value at a time. If you're doing something like 
groupByKey, the combine function may create an ArrayBuffer or stuff like that, 
but that's only for the key actively being read. As soon as one stream moves on 
to the next key, we read the first key-value pair in it but stop reading others.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-23 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49949874
  
Let me know if I've missed something. That combining part happens in 
mergeWithAggregation. Actually for many operations map-side combine is disabled 
so we don't even do it (and ExternalSorter is not used on the reduce side right 
now).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-23 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15324369
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -120,8 +124,10 @@ class ExternalAppendOnlyMap[K, V, C](
 val update: (Boolean, C) = C = (hadVal, oldVal) = {
   if (hadVal) mergeValue(oldVal, value) else createCombiner(value)
 }
-if (numPairsInMemory  trackMemoryThreshold  
currentMap.atGrowThreshold) {
-  val mapSize = currentMap.estimateSize()
+if (elementsRead  trackMemoryThreshold  elementsRead % 32 == 0 
+currentMap.estimateSize() = myMemoryThreshold)
--- End diff --

Did you mean the following:
```
if (elementsRead  trackMemoryThreshold 
(elementsRead % 32 == 0 || currentMap.estimateSize()  
myMemoryThreshold)
```
Otherwise say if the 33rd element was huge and we already went past our 
current memory threshold, we will have to wait for another 31 elements before 
we find a new threshold.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49956204
  
QA tests have started for PR 1499. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17078/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49961533
  
QA results for PR 1499:br- This patch FAILED unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17078/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-22 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49706936
  
@aarondav pushed an update to the grow code as well, which will now use 
estimateSize. I implemented the same thing in EAOM.

@colorant fixed those, thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49707121
  
QA tests have started for PR 1499. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16948/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49715947
  
QA results for PR 1499:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16948/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49788955
  
QA tests have started for PR 1499. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16983/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49800606
  
QA tests have started for PR 1499. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16990/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49802023
  
QA results for PR 1499:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16983/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-22 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49804353
  
We saw a bunch of EOF Exceptions from SpillReader.

java.io.EOFException
at 
java.io.ObjectInputStream$BlockDataInputStream.peekByte(ObjectInputStream.java:2577)
at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1315)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:79)
at 
org.apache.spark.util.collection.ExternalSorter$SpillReader.org$apache$spark$util$collection$ExternalSorter$SpillReader$$readNextItem(ExternalSorter.scala:526)
at 
org.apache.spark.util.collection.ExternalSorter$SpillReader$$anon$6.hasNext(ExternalSorter.scala:560)
at 
scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:432)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter$$anonfun$write$2.apply(SortShuffleWriter.scala:93)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter$$anonfun$write$2.apply(SortShuffleWriter.scala:89)


(Exact line might not match master, but the issue might be conveyed I hope)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-22 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15259118
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through by the
 

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-22 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15259190
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through by the
 

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49811607
  
QA results for PR 1499:br- This patch FAILED unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16990/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-22 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49826834
  
@mridulm which version of the code was that with? Right now line 526 of 
ExternalSorter is not calling readObject, so it's hard to debug. There might've 
been some fixes to this before too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-22 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49833579
  
I had pulled about 20 mins after I mailed you ...
I have elaborated on why this occurs inline in the code - we can ignore it 
for now though, since it happens even in 'regular' case : we had fixed it so 
long ago, I had forgotten about it and assumed it was some other issue !

We can punt on resolving this for now, and address it when the consolidated 
shuffle pr comes out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-22 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15271480
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through by the
  

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-21 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15192915
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through by the
  

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-21 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15192894
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through by the
  

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-21 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15193928
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through by the
  

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-21 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15194652
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through by the
  

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-21 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15195282
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through by the
  

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-21 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15196170
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through by the
  

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-21 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49669025
  
I've now updated it with I believe all of @aarondav's fixes and an 
optimization he suggested for in-memory-only data (don't bother sorting by key 
or merge-sorting).

I also tried running this on all our test suites by making sort-based 
shuffle the default, and it almost works, except there's a test with custom 
Product2 types in ShuffledRDD that breaks. However upon further investigation 
I'm not sure that feature was fully supported even in the current version (our 
current spilling code would also not return the right type), so we might remove 
that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-21 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49669437
  
Actually I missed one comment, the one on the condition for spilling; I'll 
have to fix that both in ExternalSorter and EAOM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-21 Thread colorant
Github user colorant commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15208983
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.io.{DataInputStream, FileInputStream}
+
+import org.apache.spark.shuffle._
+import org.apache.spark.{TaskContext, ShuffleDependency}
+import org.apache.spark.shuffle.hash.HashShuffleReader
+import org.apache.spark.storage.{DiskBlockManager, FileSegment, 
ShuffleBlockId}
+
+private[spark] class SortShuffleManager extends ShuffleManager {
+  /**
+   * Register a shuffle with the manager and obtain a handle for it to 
pass to tasks.
+   */
+  override def registerShuffle[K, V, C](
+  shuffleId: Int,
+  numMaps: Int,
+  dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
+new BaseShuffleHandle(shuffleId, numMaps, dependency)
+  }
+
+  /**
+   * Get a reader for a range of reduce partitions (startPartition to 
endPartition-1, inclusive).
+   * Called on executors by reduce tasks.
+   */
+  override def getReader[K, C](
+  handle: ShuffleHandle,
+  startPartition: Int,
+  endPartition: Int,
+  context: TaskContext): ShuffleReader[K, C] = {
+// We currently use the same block store shuffle fetcher as the 
hash-based shuffle.
+new HashShuffleReader(
+  handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, 
endPartition, context)
+  }
+
+  /** Get a writer for a given partition. Called on executors by map 
tasks. */
+  override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: 
TaskContext)
+  : ShuffleWriter[K, V] = {
+new SortShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], 
mapId, context)
+  }
+
+  /** Remove a shuffle's metadata from the ShuffleManager. */
+  override def unregisterShuffle(shuffleId: Int): Unit = {}
+
+  /** Shut down this ShuffleManager. */
+  override def stop(): Unit = {}
+
+  /** Get the location of a block in a map output file. Uses the index 
file we create for it. */
+  def getBlockLocation(blockId: ShuffleBlockId, diskManager: 
DiskBlockManager): FileSegment = {
+// The block is actually going to be a range of a single map output 
file for this map, so
+// so figure out the ID of the consolidated file, then the offset 
within that from our index
--- End diff --

extra so


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-21 Thread colorant
Github user colorant commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15209013
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.io.{BufferedOutputStream, File, FileOutputStream, 
DataOutputStream}
+
+import org.apache.spark.shuffle.{ShuffleWriter, BaseShuffleHandle}
+import org.apache.spark.{MapOutputTracker, SparkEnv, Logging, TaskContext}
+import org.apache.spark.scheduler.MapStatus
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.util.collection.ExternalSorter
+import org.apache.spark.storage.ShuffleBlockId
+import org.apache.spark.executor.ShuffleWriteMetrics
--- End diff --

reorder import list?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-21 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15157617
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through by the
  

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-21 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15157645
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through by the
  

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15147889
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
--- End diff --

Note that the Ordering is not currently used, but will allow us to sort 
keys within each partition so that we can use this in sortByKey


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread mateiz
GitHub user mateiz opened a pull request:

https://github.com/apache/spark/pull/1499

(WIP) SPARK-2045 Sort-based shuffle

This adds a new ShuffleManager based on sorting, as described in 
https://issues.apache.org/jira/browse/SPARK-2045. The bulk of the code is in an 
ExternalSorter class that is similar to ExternalAppendOnlyMap, but sorts 
key-value pairs by partition ID and can be used to create a single sorted file 
with a map task's output. (Longer-term I think this can take on the remaining 
functionality in ExternalAppendOnlyMap and replace it so we don't have code 
duplication.)

The main TODOs still left are enabling ExternalSorter to merge *across* 
spilled files, though this may be of limited utility (if the data is highly 
combinable this will happen early on), and adding more tests (e.g. a version of 
our shuffle suite that runs on this).

Despite this though, this seems to work pretty well (running successfully 
in cases where the hash shuffle would OOM, such as 1000 reduce tasks on 
executors with only 1G memory), and it seems to be comparable in speed or 
faster than hash-based shuffle (it will create much fewer files for the OS to 
keep track of). So I'm posting it to get some early feedback.

After these TODOs are done, I'd also like to enable ExternalSorter to sort 
data within each partition by a key as well, which will allow us to use it to 
implement external spilling in reduce tasks in `sortByKey`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mateiz/spark sort-based-shuffle

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/1499.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1499


commit cf014cb3ef394f8b34c717a2ab8c92f95edf6269
Author: Matei Zaharia ma...@databricks.com
Date:   2014-07-17T02:13:54Z

Scaffolding for sort-based shuffle

commit c1279c4b43121ad47d4e0c0cd46ba978fc29c7de
Author: Matei Zaharia ma...@databricks.com
Date:   2014-07-18T02:07:41Z

Some more partial work towards sort-based shuffle

commit 784e85a23c8bdea821dbbd2aa43f7e202ac39fe3
Author: Matei Zaharia ma...@databricks.com
Date:   2014-07-18T07:26:49Z

More partial work towards sort-based shuffle

commit 727685bd23fa83df18f471ece5ded657f8656f81
Author: Matei Zaharia ma...@databricks.com
Date:   2014-07-19T04:18:17Z

More work

commit abf052dbaafaea604f9b807e8ec5c38bd8e354fa
Author: Matei Zaharia ma...@databricks.com
Date:   2014-07-19T08:35:54Z

Add more error handling and tests for error cases

commit 066553bc99a389556eb072161fe5acc43dd73262
Author: Matei Zaharia ma...@databricks.com
Date:   2014-07-20T06:40:40Z

Add spill metrics to map tasks




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49539437
  
QA tests have started for PR 1499. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16864/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15147904
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/SizeTrackingBuffer.scala 
---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.util.Arrays
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.util.SizeEstimator
+
+/**
+ * Append-only buffer that keeps track of its estimated size in bytes.
+ * We sample with a slow exponential back-off using the SizeEstimator to 
amortize the time,
+ * as each call to SizeEstimator can take a sizable amount of time (order 
of a few milliseconds).
+ *
+ * The tracking code is copied from SizeTrackingAppendOnlyMap -- we'll 
factor that out soon.
+ */
+private[spark] class SizeTrackingBuffer[T : AnyRef](initialCapacity: Int 
= 64)
+  extends SizeTrackingCollection[T]
--- End diff --

This could be replaced with the buffer in #1165 except we also need 
destructiveSortedIterator. And to sort stuff in-place, that in turn requires T 
to be a subclass of AnyRef (otherwise we'd have to pass the Array[T] to a 
different Arrays.sort() method based on its type).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15148079
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer.getOrElse(null))
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // For now, just compare them by partition; later we can compare by key 
as well
+  private val comparator = new Comparator[((Int, K), C)] {
+override def compare(a: ((Int, K), C), b: ((Int, K), C)): Int = {
+  a._1._1 - 

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15148086
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer.getOrElse(null))
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // For now, just compare them by partition; later we can compare by key 
as well
+  private val comparator = new Comparator[((Int, K), C)] {
+override def compare(a: ((Int, K), C), b: ((Int, K), C)): Int = {
+  a._1._1 - 

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49541073
  
QA results for PR 1499:br- This patch FAILED unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brtrait SizeTrackingCollection[T] extends Iterable[T] 
{brbrFor more information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16864/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49541476
  
QA results for PR 1499:br- This patch FAILED unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brtrait SizeTrackingCollection[T] extends Iterable[T] 
{brbrFor more information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16865/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49555343
  
QA tests have started for PR 1499. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16878/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49558215
  
QA results for PR 1499:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brtrait SizeTrackingCollection[T] extends Iterable[T] 
{brbrFor more information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16878/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15152201
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer.getOrElse(null))
--- End diff --

it appears that you could just do `Serializer.getSerializer(serializer)`, 
thanks to code you added last month :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15152216
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer.getOrElse(null))
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
--- End diff --

It appears that we'll either use the map or the buffer throughout the whole 
sorting process (rather than switch from one to another). In this case, would 
it make sense to put a `val usingMap/shouldCombine = aggregator.isDefined` out 
here, and then avoid passing it around elsewhere?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15152231
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
--- End diff --

Let's make sure we have at least a comment saying this in the code, if the 
PR is to be merged while that's true.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15152282
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer.getOrElse(null))
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // For now, just compare them by partition; later we can compare by key 
as well
+  private val comparator = new Comparator[((Int, K), C)] {
+override def compare(a: ((Int, K), C), b: ((Int, K), C)): Int = {
+  

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49561754
  
QA tests have started for PR 1499. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16882/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49564210
  
QA results for PR 1499:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brtrait SizeTrackingCollection[T] extends Iterable[T] 
{brbrFor more information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16882/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49564761
  
QA tests have started for PR 1499. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16885/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15153139
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer.getOrElse(null))
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
--- End diff --

I left it this way because we might want to disable partial aggregation 
partway through if we see it's not aggregating too much.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15153152
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer.getOrElse(null))
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // For now, just compare them by partition; later we can compare by key 
as well
+  private val comparator = new Comparator[((Int, K), C)] {
+override def compare(a: ((Int, K), C), b: ((Int, K), C)): Int = {
+  a._1._1 

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49565532
  
I've now updated this to support partial aggregation across spilled files 
and even if we don't have an Ordering, using hash code comparison similar to 
ExternalAppendOnlyMap. It also now fully implements the behavior in the docs, 
namely sorting the data if you pass an Ordering, etc.

It looks like Aaron found a problem with the size-tracking code -- will try 
to fix that in ExternalAppendOnlyMap as well. Once this class is in though it 
could replace ExternalAppendOnlyMap in most use cases, though its one downside 
is that it creates another object for each key of the in-memory collection 
(since we have `((Int, K), C)` pairs to allow sorting by partition). Might 
still be worth it long-term though. On the flip side, I think the hash based 
merging code here is more efficient, avoiding a bunch of `ArrayBuffer.remove` 
calls.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49565536
  
QA tests have started for PR 1499. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16886/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49568383
  
QA results for PR 1499:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brtrait SizeTrackingCollection[T] extends Iterable[T] 
{brbrFor more information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16886/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15154006
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through by the

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15154080
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through by the

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15154163
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through by the

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15154182
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through by the

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15154203
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through by the

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15154224
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through by the

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15154239
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
--- End diff --

Perhaps note that the ordering is assumed total -- it's probably obvious, 
but since we support partial ordering in this class, may not hurt to mention it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15154289
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through by the

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15154344
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through by the

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15154387
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through by the

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15154394
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through by the

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15154433
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through by the

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15154451
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through by the

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15154478
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through by the

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15154485
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through by the

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15154509
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through by the

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15154525
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through by the

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-20 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15154547
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through by the