[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-50115353 Running tests with export SPARK_JAVA_OPTS=-Dspark.shuffle.manager=org.apache.spark.shuffle.sort.SortShuffleManager causes : ''' - sorting using mutable pairs *** FAILED *** org.apache.spark.SparkException: Job aborted due to stage failure: Task 3.0:1 failed 4 times, most recent failure: Exception failure in TID 14 on host localhost: java.lang.ArrayStoreException: scala.Tuple2 scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88) scala.Array$.slowcopy(Array.scala:81) scala.Array$.copy(Array.scala:107) scala.collection.mutable.ResizableArray$class.copyToArray(ResizableArray.scala:77) scala.collection.mutable.ArrayBuffer.copyToArray(ArrayBuffer.scala:47) scala.collection.TraversableOnce$class.copyToArray(TraversableOnce.scala:241) scala.collection.AbstractTraversable.copyToArray(Traversable.scala:105) scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:249) scala.collection.AbstractTraversable.toArray(Traversable.scala:105) scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) org.apache.spark.rdd.OrderedRDDFunctions$$anonfun$sortByKey$1.apply(OrderedRDDFunctions.scala:62) org.apache.spark.rdd.OrderedRDDFunctions$$anonfun$sortByKey$1.apply(OrderedRDDFunctions.scala:61) org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:581) org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:581) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:112) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:199) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1055) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1039) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1037) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1037) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:641) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:641) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:641) ... ''' The actual classes are : ''' classes = [class scala.Tuple2] ; [class scala.Tuple2, class scala.Tuple2, class scala.Tuple2] ''' obtained using : ''' def collect(): Array[T] = { println(classes = + sc.runJob(this, (iter: Iterator[T]) = iter.map(v = if (v == null) null else v.getClass).mkString([, ,\n\t\t, ])). mkString(, ; \n\t, )) System.out.flush() val results = sc.runJob(this, (iter: Iterator[T]) = iter.toArray) ''' in RDD.collect --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-50115453 BTW, this is one of 5 failures from core. I hope there are no merge issues though, --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-50115862 @mridulm that should've been fixed recently in https://github.com/mateiz/spark/commit/9c299579f13f004f5fd1f4dd0b98b7d76cac2a55, which got rid of custom return types in ShuffledRDD for this reason (see the message there). Maybe you need to do a clean rebuild. In an earlier commit I also added a version of ShuffleSuite that runs all those tests with sort-based shuffle. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15388684 --- Diff: core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import java.io.{DataInputStream, FileInputStream} + +import org.apache.spark.shuffle._ +import org.apache.spark.{TaskContext, ShuffleDependency} +import org.apache.spark.shuffle.hash.HashShuffleReader +import org.apache.spark.storage.{DiskBlockManager, FileSegment, ShuffleBlockId} + +private[spark] class SortShuffleManager extends ShuffleManager { + /** + * Register a shuffle with the manager and obtain a handle for it to pass to tasks. + */ + override def registerShuffle[K, V, C]( + shuffleId: Int, + numMaps: Int, + dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { +new BaseShuffleHandle(shuffleId, numMaps, dependency) + } + + /** + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). + * Called on executors by reduce tasks. + */ + override def getReader[K, C]( + handle: ShuffleHandle, + startPartition: Int, + endPartition: Int, + context: TaskContext): ShuffleReader[K, C] = { +// We currently use the same block store shuffle fetcher as the hash-based shuffle. +new HashShuffleReader( + handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context) + } + + /** Get a writer for a given partition. Called on executors by map tasks. */ + override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext) + : ShuffleWriter[K, V] = { +new SortShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context) + } + + /** Remove a shuffle's metadata from the ShuffleManager. */ + override def unregisterShuffle(shuffleId: Int): Unit = {} --- End diff -- Alright, fixed this in the latest commit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-50116492 ah, thanks ! rerunning with 9c29957. cant pull the pr - and manual merge is painful, hence delays in testing :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user aarondav commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-50159041 Not sure if it helps, but you can get PRs via something like `git fetch apache refs/pull/1499/head`. Easier workflow, perhaps. On Jul 25, 2014 12:25 AM, Mridul Muralidharan notificati...@github.com wrote: ah, thanks ! rerunning with 9c29957 https://github.com/apache/spark/commit/9c29957. cant pull the pr - and manual merge is painful, hence delays in testing :-) â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/1499#issuecomment-50116492. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-50160338 After installing `hub` you can also do a bunch of new stuff on the command line, including `hub checkout https://github.com/apache/spark/pull/1499` https://hub.github.com/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-50210371 QA tests have started for PR 1499. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17202/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-50213171 QA results for PR 1499:br- This patch FAILED unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brclass ShuffledRDD[K, V, C](brcase class ShuffleIndexBlockId(shuffleId: Int, mapId: Int, reduceId: Int)brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17202/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user colorant commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15331341 --- Diff: core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import java.io.{DataInputStream, FileInputStream} + +import org.apache.spark.shuffle._ +import org.apache.spark.{TaskContext, ShuffleDependency} +import org.apache.spark.shuffle.hash.HashShuffleReader +import org.apache.spark.storage.{DiskBlockManager, FileSegment, ShuffleBlockId} + +private[spark] class SortShuffleManager extends ShuffleManager { + /** + * Register a shuffle with the manager and obtain a handle for it to pass to tasks. + */ + override def registerShuffle[K, V, C]( + shuffleId: Int, + numMaps: Int, + dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { +new BaseShuffleHandle(shuffleId, numMaps, dependency) + } + + /** + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). + * Called on executors by reduce tasks. + */ + override def getReader[K, C]( + handle: ShuffleHandle, + startPartition: Int, + endPartition: Int, + context: TaskContext): ShuffleReader[K, C] = { +// We currently use the same block store shuffle fetcher as the hash-based shuffle. +new HashShuffleReader( + handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context) + } + + /** Get a writer for a given partition. Called on executors by map tasks. */ + override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext) + : ShuffleWriter[K, V] = { +new SortShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context) + } + + /** Remove a shuffle's metadata from the ShuffleManager. */ + override def unregisterShuffle(shuffleId: Int): Unit = {} --- End diff -- shuffle output file in sortShuffleWritter do not get cleaned. We might need to add map to save registered shuffle handle, Then try to remove the data file in unregisterShuffle method. Though at present, in HashShuffleManager, this is also not implemented. But HashShuffleManager depends on shuffleBlockManager and the file will be cleaned there. I have a PR to generalize shuffleBlockManager and hide it behind shuffleMananger. as in #1241 , and upon blockMananger do remove shuffle, will call into this unregisterShuffle method. Will rebase upon this PR been merged. Should we fix this issue in my PR, or add the store/clean shuffleHandle logic here in this PR firstly? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15332387 --- Diff: core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import java.io.{DataInputStream, FileInputStream} + +import org.apache.spark.shuffle._ +import org.apache.spark.{TaskContext, ShuffleDependency} +import org.apache.spark.shuffle.hash.HashShuffleReader +import org.apache.spark.storage.{DiskBlockManager, FileSegment, ShuffleBlockId} + +private[spark] class SortShuffleManager extends ShuffleManager { + /** + * Register a shuffle with the manager and obtain a handle for it to pass to tasks. + */ + override def registerShuffle[K, V, C]( + shuffleId: Int, + numMaps: Int, + dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { +new BaseShuffleHandle(shuffleId, numMaps, dependency) + } + + /** + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). + * Called on executors by reduce tasks. + */ + override def getReader[K, C]( + handle: ShuffleHandle, + startPartition: Int, + endPartition: Int, + context: TaskContext): ShuffleReader[K, C] = { +// We currently use the same block store shuffle fetcher as the hash-based shuffle. +new HashShuffleReader( + handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context) + } + + /** Get a writer for a given partition. Called on executors by map tasks. */ + override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext) + : ShuffleWriter[K, V] = { +new SortShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context) + } + + /** Remove a shuffle's metadata from the ShuffleManager. */ + override def unregisterShuffle(shuffleId: Int): Unit = {} --- End diff -- Good question, but files on disk actually get cleaned by BlockManager directly if cleaning is turned on, and sort-based shuffle is set up to only use files. ShuffleBlockManager had extra state in memory in the form of metadata, that's why it needs its own cleaner. But I don't think there's an issue here. It will be important to do something for unregisterShuffle, but right now nothing calls that. That's part of the API because I wanted to move the MapOutputTracker behind the ShuffleManager as well. But any patches to move more stuff behind it are welcome, and I agree the ShuffleBlockManager should be specific to hash-based shuffle. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49973755 Added one more commit that fixes the type of ShuffledRDD, because in this new shuffle it's not possible to return a custom Product2 the way it's written now, and in the old one it wasn't possible anyway with aggregation. See https://github.com/mateiz/spark/commit/9c299579f13f004f5fd1f4dd0b98b7d76cac2a55 for details. @rxin you should look at this one. With this commit there's now a version of ShuffleSuite that passes using SortBasedShuffle. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user colorant commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15333148 --- Diff: core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import java.io.{DataInputStream, FileInputStream} + +import org.apache.spark.shuffle._ +import org.apache.spark.{TaskContext, ShuffleDependency} +import org.apache.spark.shuffle.hash.HashShuffleReader +import org.apache.spark.storage.{DiskBlockManager, FileSegment, ShuffleBlockId} + +private[spark] class SortShuffleManager extends ShuffleManager { + /** + * Register a shuffle with the manager and obtain a handle for it to pass to tasks. + */ + override def registerShuffle[K, V, C]( + shuffleId: Int, + numMaps: Int, + dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { +new BaseShuffleHandle(shuffleId, numMaps, dependency) + } + + /** + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). + * Called on executors by reduce tasks. + */ + override def getReader[K, C]( + handle: ShuffleHandle, + startPartition: Int, + endPartition: Int, + context: TaskContext): ShuffleReader[K, C] = { +// We currently use the same block store shuffle fetcher as the hash-based shuffle. +new HashShuffleReader( + handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context) + } + + /** Get a writer for a given partition. Called on executors by map tasks. */ + override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext) + : ShuffleWriter[K, V] = { +new SortShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context) + } + + /** Remove a shuffle's metadata from the ShuffleManager. */ + override def unregisterShuffle(shuffleId: Int): Unit = {} --- End diff -- Maybe I miss some code? But from what I understanding, when the cleaning is turned on, it seems to me, if by timestamp approaching, blockManager won't remove shuffle data,ShuffleBlockManager will do the work by itself. And if by auto clean approaching when doCleanShuffle is called, it go through the current ShuffleBlockManager interface. In neither case, these sortShuffleWritter generated file will be cleaned? Or do you mean, as long as there are no metadata in memory, shuffle file on disk is ok to not be removed until application exit? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15382131 --- Diff: core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import java.io.{DataInputStream, FileInputStream} + +import org.apache.spark.shuffle._ +import org.apache.spark.{TaskContext, ShuffleDependency} +import org.apache.spark.shuffle.hash.HashShuffleReader +import org.apache.spark.storage.{DiskBlockManager, FileSegment, ShuffleBlockId} + +private[spark] class SortShuffleManager extends ShuffleManager { + /** + * Register a shuffle with the manager and obtain a handle for it to pass to tasks. + */ + override def registerShuffle[K, V, C]( + shuffleId: Int, + numMaps: Int, + dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { +new BaseShuffleHandle(shuffleId, numMaps, dependency) + } + + /** + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). + * Called on executors by reduce tasks. + */ + override def getReader[K, C]( + handle: ShuffleHandle, + startPartition: Int, + endPartition: Int, + context: TaskContext): ShuffleReader[K, C] = { +// We currently use the same block store shuffle fetcher as the hash-based shuffle. +new HashShuffleReader( + handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context) + } + + /** Get a writer for a given partition. Called on executors by map tasks. */ + override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext) + : ShuffleWriter[K, V] = { +new SortShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context) + } + + /** Remove a shuffle's metadata from the ShuffleManager. */ + override def unregisterShuffle(shuffleId: Int): Unit = {} --- End diff -- Ah, I see, these files are never registered with the BlockManager so they won't be cleaned by it. I'll modify ShuffleBlockManager to clean them for now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15383498 --- Diff: core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala --- @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import java.io.{BufferedOutputStream, File, FileOutputStream, DataOutputStream} + +import org.apache.spark.{MapOutputTracker, SparkEnv, Logging, TaskContext} +import org.apache.spark.executor.ShuffleWriteMetrics +import org.apache.spark.scheduler.MapStatus +import org.apache.spark.serializer.Serializer +import org.apache.spark.shuffle.{ShuffleWriter, BaseShuffleHandle} +import org.apache.spark.storage.ShuffleBlockId +import org.apache.spark.util.collection.ExternalSorter + +private[spark] class SortShuffleWriter[K, V, C]( +handle: BaseShuffleHandle[K, V, C], +mapId: Int, +context: TaskContext) + extends ShuffleWriter[K, V] with Logging { + + private val dep = handle.dependency + private val numPartitions = dep.partitioner.numPartitions + + private val blockManager = SparkEnv.get.blockManager + private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null)) + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + + private var sorter: ExternalSorter[K, V, _] = null + private var outputFile: File = null + + private var stopping = false + private var mapStatus: MapStatus = null + + /** Write a bunch of records to this task's output */ + override def write(records: Iterator[_ : Product2[K, V]]): Unit = { +val partitions: Iterator[(Int, Iterator[Product2[K, _]])] = { + if (dep.mapSideCombine) { +if (!dep.aggregator.isDefined) { + throw new IllegalStateException(Aggregator is empty for map-side combine) +} +sorter = new ExternalSorter[K, V, C]( + dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) +sorter.write(records) +sorter.partitionedIterator + } else { +// In this case we pass neither an aggregator nor an ordering to the sorter, because we +// don't care whether the keys get sorted in each partition; that will be done on the +// reduce side if the operation being run is sortByKey. +sorter = new ExternalSorter[K, V, V]( + None, Some(dep.partitioner), None, dep.serializer) +sorter.write(records) +sorter.partitionedIterator + } +} + +// Create a single shuffle file with reduce ID 0 that we'll write all results to. We'll later +// serve different ranges of this file using an index file that we create at the end. +val blockId = ShuffleBlockId(dep.shuffleId, mapId, 0) +outputFile = blockManager.diskBlockManager.getFile(blockId) + +// Track location of each range in the output file +val offsets = new Array[Long](numPartitions + 1) +val lengths = new Array[Long](numPartitions) + +// Statistics +var totalBytes = 0L +var totalTime = 0L + +for ((id, elements) - partitions) { + if (elements.hasNext) { +val writer = blockManager.getDiskWriter(blockId, outputFile, ser, fileBufferSize) +for (elem - elements) { + writer.write(elem) +} +writer.commit() +writer.close() +val segment = writer.fileSegment() +offsets(id + 1) = segment.offset + segment.length +lengths(id) = segment.length +totalTime += writer.timeWriting() +totalBytes += segment.length + } else { +// Don't create a new writer to avoid writing any headers and things like that +offsets(id + 1) = offsets(id) + } +} + +val
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15274240 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,649 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional Partitioner; if given, sort by partition ID and then key + * @param ordering optional Ordering to sort keys within each partition; should be a total ordering + * @param serializer serializer to use when spilling to disk + * + * Note that if an Ordering is given, we'll always sort using it, so only provide it if you really + * want the output keys to be sorted. In a map task without map-side combine for example, you + * probably want to pass None as the ordering to avoid extra sorting. On the other hand, if you do + * want to do combining, having an Ordering is more efficient than not having it. + * + * At a high level, this class works as follows: + * + * - We repeatedly fill up buffers of in-memory data, using either a SizeTrackingAppendOnlyMap if + * we want to combine by key, or an simple SizeTrackingBuffer if we don't. Inside these buffers, + * we sort elements of type ((Int, K), C) where the Int is the partition ID. This is done to + * avoid calling the partitioner multiple times on the same key (e.g. for RangePartitioner). + * + * - When each buffer reaches our memory limit, we spill it to a file. This file is sorted first + * by partition ID and possibly second by key or by hash code of the key, if we want to do + * aggregation. For each file, we track how many objects were in each partition in memory, so we + * don't have to write out the partition ID for every element. + * + * - When the user requests an iterator, the spilled files are merged, along with any remaining + * in-memory data, using the same sort order defined above (unless both sorting and aggregation + * are disabled). If we need to aggregate by key, we either use a total ordering from the + * ordering parameter, or read the keys with the same hash code and compare them with each other + * for equality to merge values. + * + * - Users are expected to call stop() at the end to delete all the intermediate files. + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf =
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user colorant commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15276601 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,649 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional Partitioner; if given, sort by partition ID and then key + * @param ordering optional Ordering to sort keys within each partition; should be a total ordering + * @param serializer serializer to use when spilling to disk + * + * Note that if an Ordering is given, we'll always sort using it, so only provide it if you really + * want the output keys to be sorted. In a map task without map-side combine for example, you + * probably want to pass None as the ordering to avoid extra sorting. On the other hand, if you do + * want to do combining, having an Ordering is more efficient than not having it. + * + * At a high level, this class works as follows: + * + * - We repeatedly fill up buffers of in-memory data, using either a SizeTrackingAppendOnlyMap if + * we want to combine by key, or an simple SizeTrackingBuffer if we don't. Inside these buffers, + * we sort elements of type ((Int, K), C) where the Int is the partition ID. This is done to + * avoid calling the partitioner multiple times on the same key (e.g. for RangePartitioner). + * + * - When each buffer reaches our memory limit, we spill it to a file. This file is sorted first + * by partition ID and possibly second by key or by hash code of the key, if we want to do + * aggregation. For each file, we track how many objects were in each partition in memory, so we + * don't have to write out the partition ID for every element. + * + * - When the user requests an iterator, the spilled files are merged, along with any remaining + * in-memory data, using the same sort order defined above (unless both sorting and aggregation + * are disabled). If we need to aggregate by key, we either use a total ordering from the + * ordering parameter, or read the keys with the same hash code and compare them with each other + * for equality to merge values. + * + * - Users are expected to call stop() at the end to delete all the intermediate files. + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf =
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user colorant commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15277089 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,649 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional Partitioner; if given, sort by partition ID and then key + * @param ordering optional Ordering to sort keys within each partition; should be a total ordering + * @param serializer serializer to use when spilling to disk + * + * Note that if an Ordering is given, we'll always sort using it, so only provide it if you really + * want the output keys to be sorted. In a map task without map-side combine for example, you + * probably want to pass None as the ordering to avoid extra sorting. On the other hand, if you do + * want to do combining, having an Ordering is more efficient than not having it. + * + * At a high level, this class works as follows: + * + * - We repeatedly fill up buffers of in-memory data, using either a SizeTrackingAppendOnlyMap if + * we want to combine by key, or an simple SizeTrackingBuffer if we don't. Inside these buffers, + * we sort elements of type ((Int, K), C) where the Int is the partition ID. This is done to + * avoid calling the partitioner multiple times on the same key (e.g. for RangePartitioner). + * + * - When each buffer reaches our memory limit, we spill it to a file. This file is sorted first + * by partition ID and possibly second by key or by hash code of the key, if we want to do + * aggregation. For each file, we track how many objects were in each partition in memory, so we + * don't have to write out the partition ID for every element. + * + * - When the user requests an iterator, the spilled files are merged, along with any remaining + * in-memory data, using the same sort order defined above (unless both sorting and aggregation + * are disabled). If we need to aggregate by key, we either use a total ordering from the + * ordering parameter, or read the keys with the same hash code and compare them with each other + * for equality to merge values. + * + * - Users are expected to call stop() at the end to delete all the intermediate files. + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf =
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15288486 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,649 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional Partitioner; if given, sort by partition ID and then key + * @param ordering optional Ordering to sort keys within each partition; should be a total ordering + * @param serializer serializer to use when spilling to disk + * + * Note that if an Ordering is given, we'll always sort using it, so only provide it if you really + * want the output keys to be sorted. In a map task without map-side combine for example, you + * probably want to pass None as the ordering to avoid extra sorting. On the other hand, if you do + * want to do combining, having an Ordering is more efficient than not having it. + * + * At a high level, this class works as follows: + * + * - We repeatedly fill up buffers of in-memory data, using either a SizeTrackingAppendOnlyMap if + * we want to combine by key, or an simple SizeTrackingBuffer if we don't. Inside these buffers, + * we sort elements of type ((Int, K), C) where the Int is the partition ID. This is done to + * avoid calling the partitioner multiple times on the same key (e.g. for RangePartitioner). + * + * - When each buffer reaches our memory limit, we spill it to a file. This file is sorted first + * by partition ID and possibly second by key or by hash code of the key, if we want to do + * aggregation. For each file, we track how many objects were in each partition in memory, so we + * don't have to write out the partition ID for every element. + * + * - When the user requests an iterator, the spilled files are merged, along with any remaining + * in-memory data, using the same sort order defined above (unless both sorting and aggregation + * are disabled). If we need to aggregate by key, we either use a total ordering from the + * ordering parameter, or read the keys with the same hash code and compare them with each other + * for equality to merge values. + * + * - Users are expected to call stop() at the end to delete all the intermediate files. + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf =
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15310488 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,649 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional Partitioner; if given, sort by partition ID and then key + * @param ordering optional Ordering to sort keys within each partition; should be a total ordering + * @param serializer serializer to use when spilling to disk + * + * Note that if an Ordering is given, we'll always sort using it, so only provide it if you really + * want the output keys to be sorted. In a map task without map-side combine for example, you + * probably want to pass None as the ordering to avoid extra sorting. On the other hand, if you do + * want to do combining, having an Ordering is more efficient than not having it. + * + * At a high level, this class works as follows: + * + * - We repeatedly fill up buffers of in-memory data, using either a SizeTrackingAppendOnlyMap if + * we want to combine by key, or an simple SizeTrackingBuffer if we don't. Inside these buffers, + * we sort elements of type ((Int, K), C) where the Int is the partition ID. This is done to + * avoid calling the partitioner multiple times on the same key (e.g. for RangePartitioner). + * + * - When each buffer reaches our memory limit, we spill it to a file. This file is sorted first + * by partition ID and possibly second by key or by hash code of the key, if we want to do + * aggregation. For each file, we track how many objects were in each partition in memory, so we + * don't have to write out the partition ID for every element. + * + * - When the user requests an iterator, the spilled files are merged, along with any remaining + * in-memory data, using the same sort order defined above (unless both sorting and aggregation + * are disabled). If we need to aggregate by key, we either use a total ordering from the + * ordering parameter, or read the keys with the same hash code and compare them with each other + * for equality to merge values. + * + * - Users are expected to call stop() at the end to delete all the intermediate files. + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf =
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15310561 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,649 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional Partitioner; if given, sort by partition ID and then key + * @param ordering optional Ordering to sort keys within each partition; should be a total ordering + * @param serializer serializer to use when spilling to disk + * + * Note that if an Ordering is given, we'll always sort using it, so only provide it if you really + * want the output keys to be sorted. In a map task without map-side combine for example, you + * probably want to pass None as the ordering to avoid extra sorting. On the other hand, if you do + * want to do combining, having an Ordering is more efficient than not having it. + * + * At a high level, this class works as follows: + * + * - We repeatedly fill up buffers of in-memory data, using either a SizeTrackingAppendOnlyMap if + * we want to combine by key, or an simple SizeTrackingBuffer if we don't. Inside these buffers, + * we sort elements of type ((Int, K), C) where the Int is the partition ID. This is done to + * avoid calling the partitioner multiple times on the same key (e.g. for RangePartitioner). + * + * - When each buffer reaches our memory limit, we spill it to a file. This file is sorted first + * by partition ID and possibly second by key or by hash code of the key, if we want to do + * aggregation. For each file, we track how many objects were in each partition in memory, so we + * don't have to write out the partition ID for every element. + * + * - When the user requests an iterator, the spilled files are merged, along with any remaining + * in-memory data, using the same sort order defined above (unless both sorting and aggregation + * are disabled). If we need to aggregate by key, we either use a total ordering from the + * ordering parameter, or read the keys with the same hash code and compare them with each other + * for equality to merge values. + * + * - Users are expected to call stop() at the end to delete all the intermediate files. + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf =
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49924823 @colorant @mridulm for the serializing part, one issue is that as we merge streams we already need a heap of O(num streams) objects, so if you're worried about the objects being large, deferring the read won't help tremendously. I'm hoping to fix that by adding a max merge factor and supporting hierarchical merges later, but I think we can do it after this first PR. (If possible I would like to get it in 1.1 too, since it's not that much code. But I don't want to have a giant PR waiting that blocks other things.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49925284 QA tests have started for PR 1499. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17055/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49938524 QA results for PR 1499:br- This patch FAILED unit tests.br- This patch merges cleanlybr- This patch adds no public classesbrbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17055/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49949511 @mateiz The total memory overhead actually goes much higher than num_streams right ? It should be order of num_streams + num_values for this key. For fairly large values, the latter might fit into memory, but the former might not (particularly as number of mappers increases). Or did I get this wrong from the PR ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49949660 No, we only read one value at a time. If you're doing something like groupByKey, the combine function may create an ArrayBuffer or stuff like that, but that's only for the key actively being read. As soon as one stream moves on to the next key, we read the first key-value pair in it but stop reading others. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49949874 Let me know if I've missed something. That combining part happens in mergeWithAggregation. Actually for many operations map-side combine is disabled so we don't even do it (and ExternalSorter is not used on the reduce side right now). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15324369 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -120,8 +124,10 @@ class ExternalAppendOnlyMap[K, V, C]( val update: (Boolean, C) = C = (hadVal, oldVal) = { if (hadVal) mergeValue(oldVal, value) else createCombiner(value) } -if (numPairsInMemory trackMemoryThreshold currentMap.atGrowThreshold) { - val mapSize = currentMap.estimateSize() +if (elementsRead trackMemoryThreshold elementsRead % 32 == 0 +currentMap.estimateSize() = myMemoryThreshold) --- End diff -- Did you mean the following: ``` if (elementsRead trackMemoryThreshold (elementsRead % 32 == 0 || currentMap.estimateSize() myMemoryThreshold) ``` Otherwise say if the 33rd element was huge and we already went past our current memory threshold, we will have to wait for another 31 elements before we find a new threshold. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49956204 QA tests have started for PR 1499. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17078/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49961533 QA results for PR 1499:br- This patch FAILED unit tests.br- This patch merges cleanlybr- This patch adds no public classesbrbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17078/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49706936 @aarondav pushed an update to the grow code as well, which will now use estimateSize. I implemented the same thing in EAOM. @colorant fixed those, thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49707121 QA tests have started for PR 1499. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16948/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49715947 QA results for PR 1499:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds no public classesbrbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16948/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49788955 QA tests have started for PR 1499. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16983/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49800606 QA tests have started for PR 1499. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16990/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49802023 QA results for PR 1499:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds no public classesbrbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16983/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49804353 We saw a bunch of EOF Exceptions from SpillReader. java.io.EOFException at java.io.ObjectInputStream$BlockDataInputStream.peekByte(ObjectInputStream.java:2577) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1315) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:79) at org.apache.spark.util.collection.ExternalSorter$SpillReader.org$apache$spark$util$collection$ExternalSorter$SpillReader$$readNextItem(ExternalSorter.scala:526) at org.apache.spark.util.collection.ExternalSorter$SpillReader$$anon$6.hasNext(ExternalSorter.scala:560) at scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:432) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.shuffle.sort.SortShuffleWriter$$anonfun$write$2.apply(SortShuffleWriter.scala:93) at org.apache.spark.shuffle.sort.SortShuffleWriter$$anonfun$write$2.apply(SortShuffleWriter.scala:89) (Exact line might not match master, but the issue might be conveyed I hope) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15259118 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through by the
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15259190 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through by the
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49811607 QA results for PR 1499:br- This patch FAILED unit tests.br- This patch merges cleanlybr- This patch adds no public classesbrbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16990/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49826834 @mridulm which version of the code was that with? Right now line 526 of ExternalSorter is not calling readObject, so it's hard to debug. There might've been some fixes to this before too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49833579 I had pulled about 20 mins after I mailed you ... I have elaborated on why this occurs inline in the code - we can ignore it for now though, since it happens even in 'regular' case : we had fixed it so long ago, I had forgotten about it and assumed it was some other issue ! We can punt on resolving this for now, and address it when the consolidated shuffle pr comes out. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15271480 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through by the
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15192915 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through by the
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15192894 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through by the
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15193928 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through by the
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15194652 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through by the
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15195282 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through by the
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15196170 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through by the
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49669025 I've now updated it with I believe all of @aarondav's fixes and an optimization he suggested for in-memory-only data (don't bother sorting by key or merge-sorting). I also tried running this on all our test suites by making sort-based shuffle the default, and it almost works, except there's a test with custom Product2 types in ShuffledRDD that breaks. However upon further investigation I'm not sure that feature was fully supported even in the current version (our current spilling code would also not return the right type), so we might remove that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49669437 Actually I missed one comment, the one on the condition for spilling; I'll have to fix that both in ExternalSorter and EAOM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user colorant commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15208983 --- Diff: core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import java.io.{DataInputStream, FileInputStream} + +import org.apache.spark.shuffle._ +import org.apache.spark.{TaskContext, ShuffleDependency} +import org.apache.spark.shuffle.hash.HashShuffleReader +import org.apache.spark.storage.{DiskBlockManager, FileSegment, ShuffleBlockId} + +private[spark] class SortShuffleManager extends ShuffleManager { + /** + * Register a shuffle with the manager and obtain a handle for it to pass to tasks. + */ + override def registerShuffle[K, V, C]( + shuffleId: Int, + numMaps: Int, + dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { +new BaseShuffleHandle(shuffleId, numMaps, dependency) + } + + /** + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). + * Called on executors by reduce tasks. + */ + override def getReader[K, C]( + handle: ShuffleHandle, + startPartition: Int, + endPartition: Int, + context: TaskContext): ShuffleReader[K, C] = { +// We currently use the same block store shuffle fetcher as the hash-based shuffle. +new HashShuffleReader( + handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context) + } + + /** Get a writer for a given partition. Called on executors by map tasks. */ + override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext) + : ShuffleWriter[K, V] = { +new SortShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context) + } + + /** Remove a shuffle's metadata from the ShuffleManager. */ + override def unregisterShuffle(shuffleId: Int): Unit = {} + + /** Shut down this ShuffleManager. */ + override def stop(): Unit = {} + + /** Get the location of a block in a map output file. Uses the index file we create for it. */ + def getBlockLocation(blockId: ShuffleBlockId, diskManager: DiskBlockManager): FileSegment = { +// The block is actually going to be a range of a single map output file for this map, so +// so figure out the ID of the consolidated file, then the offset within that from our index --- End diff -- extra so --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user colorant commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15209013 --- Diff: core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala --- @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import java.io.{BufferedOutputStream, File, FileOutputStream, DataOutputStream} + +import org.apache.spark.shuffle.{ShuffleWriter, BaseShuffleHandle} +import org.apache.spark.{MapOutputTracker, SparkEnv, Logging, TaskContext} +import org.apache.spark.scheduler.MapStatus +import org.apache.spark.serializer.Serializer +import org.apache.spark.util.collection.ExternalSorter +import org.apache.spark.storage.ShuffleBlockId +import org.apache.spark.executor.ShuffleWriteMetrics --- End diff -- reorder import list? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15157617 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through by the
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15157645 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through by the
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15147889 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { --- End diff -- Note that the Ordering is not currently used, but will allow us to sort keys within each partition so that we can use this in sortByKey --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
GitHub user mateiz opened a pull request: https://github.com/apache/spark/pull/1499 (WIP) SPARK-2045 Sort-based shuffle This adds a new ShuffleManager based on sorting, as described in https://issues.apache.org/jira/browse/SPARK-2045. The bulk of the code is in an ExternalSorter class that is similar to ExternalAppendOnlyMap, but sorts key-value pairs by partition ID and can be used to create a single sorted file with a map task's output. (Longer-term I think this can take on the remaining functionality in ExternalAppendOnlyMap and replace it so we don't have code duplication.) The main TODOs still left are enabling ExternalSorter to merge *across* spilled files, though this may be of limited utility (if the data is highly combinable this will happen early on), and adding more tests (e.g. a version of our shuffle suite that runs on this). Despite this though, this seems to work pretty well (running successfully in cases where the hash shuffle would OOM, such as 1000 reduce tasks on executors with only 1G memory), and it seems to be comparable in speed or faster than hash-based shuffle (it will create much fewer files for the OS to keep track of). So I'm posting it to get some early feedback. After these TODOs are done, I'd also like to enable ExternalSorter to sort data within each partition by a key as well, which will allow us to use it to implement external spilling in reduce tasks in `sortByKey`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mateiz/spark sort-based-shuffle Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/1499.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1499 commit cf014cb3ef394f8b34c717a2ab8c92f95edf6269 Author: Matei Zaharia ma...@databricks.com Date: 2014-07-17T02:13:54Z Scaffolding for sort-based shuffle commit c1279c4b43121ad47d4e0c0cd46ba978fc29c7de Author: Matei Zaharia ma...@databricks.com Date: 2014-07-18T02:07:41Z Some more partial work towards sort-based shuffle commit 784e85a23c8bdea821dbbd2aa43f7e202ac39fe3 Author: Matei Zaharia ma...@databricks.com Date: 2014-07-18T07:26:49Z More partial work towards sort-based shuffle commit 727685bd23fa83df18f471ece5ded657f8656f81 Author: Matei Zaharia ma...@databricks.com Date: 2014-07-19T04:18:17Z More work commit abf052dbaafaea604f9b807e8ec5c38bd8e354fa Author: Matei Zaharia ma...@databricks.com Date: 2014-07-19T08:35:54Z Add more error handling and tests for error cases commit 066553bc99a389556eb072161fe5acc43dd73262 Author: Matei Zaharia ma...@databricks.com Date: 2014-07-20T06:40:40Z Add spill metrics to map tasks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49539437 QA tests have started for PR 1499. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16864/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15147904 --- Diff: core/src/main/scala/org/apache/spark/util/collection/SizeTrackingBuffer.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.util.Arrays +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.util.SizeEstimator + +/** + * Append-only buffer that keeps track of its estimated size in bytes. + * We sample with a slow exponential back-off using the SizeEstimator to amortize the time, + * as each call to SizeEstimator can take a sizable amount of time (order of a few milliseconds). + * + * The tracking code is copied from SizeTrackingAppendOnlyMap -- we'll factor that out soon. + */ +private[spark] class SizeTrackingBuffer[T : AnyRef](initialCapacity: Int = 64) + extends SizeTrackingCollection[T] --- End diff -- This could be replaced with the buffer in #1165 except we also need destructiveSortedIterator. And to sort stuff in-place, that in turn requires T to be a subclass of AnyRef (otherwise we'd have to pass the Array[T] to a different Arrays.sort() method based on its type). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15148079 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer.getOrElse(null)) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // For now, just compare them by partition; later we can compare by key as well + private val comparator = new Comparator[((Int, K), C)] { +override def compare(a: ((Int, K), C), b: ((Int, K), C)): Int = { + a._1._1 -
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15148086 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer.getOrElse(null)) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // For now, just compare them by partition; later we can compare by key as well + private val comparator = new Comparator[((Int, K), C)] { +override def compare(a: ((Int, K), C), b: ((Int, K), C)): Int = { + a._1._1 -
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49541073 QA results for PR 1499:br- This patch FAILED unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brtrait SizeTrackingCollection[T] extends Iterable[T] {brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16864/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49541476 QA results for PR 1499:br- This patch FAILED unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brtrait SizeTrackingCollection[T] extends Iterable[T] {brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16865/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49555343 QA tests have started for PR 1499. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16878/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49558215 QA results for PR 1499:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brtrait SizeTrackingCollection[T] extends Iterable[T] {brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16878/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15152201 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer.getOrElse(null)) --- End diff -- it appears that you could just do `Serializer.getSerializer(serializer)`, thanks to code you added last month :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15152216 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer.getOrElse(null)) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] --- End diff -- It appears that we'll either use the map or the buffer throughout the whole sorting process (rather than switch from one to another). In this case, would it make sense to put a `val usingMap/shouldCombine = aggregator.isDefined` out here, and then avoid passing it around elsewhere? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15152231 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { --- End diff -- Let's make sure we have at least a comment saying this in the code, if the PR is to be merged while that's true. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15152282 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer.getOrElse(null)) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // For now, just compare them by partition; later we can compare by key as well + private val comparator = new Comparator[((Int, K), C)] { +override def compare(a: ((Int, K), C), b: ((Int, K), C)): Int = { +
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49561754 QA tests have started for PR 1499. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16882/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49564210 QA results for PR 1499:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brtrait SizeTrackingCollection[T] extends Iterable[T] {brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16882/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49564761 QA tests have started for PR 1499. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16885/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15153139 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer.getOrElse(null)) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] --- End diff -- I left it this way because we might want to disable partial aggregation partway through if we see it's not aggregating too much. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15153152 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer.getOrElse(null)) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // For now, just compare them by partition; later we can compare by key as well + private val comparator = new Comparator[((Int, K), C)] { +override def compare(a: ((Int, K), C), b: ((Int, K), C)): Int = { + a._1._1
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49565532 I've now updated this to support partial aggregation across spilled files and even if we don't have an Ordering, using hash code comparison similar to ExternalAppendOnlyMap. It also now fully implements the behavior in the docs, namely sorting the data if you pass an Ordering, etc. It looks like Aaron found a problem with the size-tracking code -- will try to fix that in ExternalAppendOnlyMap as well. Once this class is in though it could replace ExternalAppendOnlyMap in most use cases, though its one downside is that it creates another object for each key of the in-memory collection (since we have `((Int, K), C)` pairs to allow sorting by partition). Might still be worth it long-term though. On the flip side, I think the hash based merging code here is more efficient, avoiding a bunch of `ArrayBuffer.remove` calls. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49565536 QA tests have started for PR 1499. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16886/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49568383 QA results for PR 1499:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brtrait SizeTrackingCollection[T] extends Iterable[T] {brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16886/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15154006 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through by the
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15154080 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through by the
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15154163 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through by the
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15154182 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through by the
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15154203 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through by the
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15154224 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through by the
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15154239 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition --- End diff -- Perhaps note that the ordering is assumed total -- it's probably obvious, but since we support partial ordering in this class, may not hurt to mention it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15154289 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through by the
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15154344 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through by the
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15154387 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through by the
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15154394 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through by the
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15154433 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through by the
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15154451 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through by the
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15154478 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through by the
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15154485 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through by the
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15154509 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through by the
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15154525 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through by the
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15154547 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through by the