spark git commit: [SPARK-11216][SQL][FOLLOW-UP] add encoder/decoder for external row
Repository: spark Updated Branches: refs/heads/master f6d06adf0 -> 42d225f44 [SPARK-11216][SQL][FOLLOW-UP] add encoder/decoder for external row address comments in https://github.com/apache/spark/pull/9184 Author: Wenchen FanCloses #9212 from cloud-fan/encoder. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42d225f4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42d225f4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42d225f4 Branch: refs/heads/master Commit: 42d225f449c633be7465493c57b9881303ee14ba Parents: f6d06ad Author: Wenchen Fan Authored: Thu Oct 22 10:53:59 2015 -0700 Committer: Michael Armbrust Committed: Thu Oct 22 10:53:59 2015 -0700 -- .../spark/sql/catalyst/encoders/ClassEncoder.scala| 14 +++--- .../spark/sql/catalyst/encoders/RowEncoder.scala | 9 ++--- .../spark/sql/catalyst/expressions/objects.scala | 8 +++- .../spark/sql/catalyst/encoders/RowEncoderSuite.scala | 2 +- 4 files changed, 17 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/42d225f4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ClassEncoder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ClassEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ClassEncoder.scala index f3a1063..54096f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ClassEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ClassEncoder.scala @@ -48,20 +48,12 @@ case class ClassEncoder[T]( private val dataType = ObjectType(clsTag.runtimeClass) override def toRow(t: T): InternalRow = { -if (t == null) { - null -} else { - inputRow(0) = t - extractProjection(inputRow) -} +inputRow(0) = t +extractProjection(inputRow) } override def fromRow(row: InternalRow): T = { -if (row eq null) { - null.asInstanceOf[T] -} else { - constructProjection(row).get(0, dataType).asInstanceOf[T] -} +constructProjection(row).get(0, dataType).asInstanceOf[T] } override def bind(schema: Seq[Attribute]): ClassEncoder[T] = { http://git-wip-us.apache.org/repos/asf/spark/blob/42d225f4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala index 3e74aab..5142856 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala @@ -26,8 +26,11 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +/** + * A factory for constructing encoders that convert external row to/from the Spark SQL + * internal binary representation. + */ object RowEncoder { - def apply(schema: StructType): ClassEncoder[Row] = { val cls = classOf[Row] val inputObject = BoundReference(0, ObjectType(cls), nullable = true) @@ -136,7 +139,7 @@ object RowEncoder { constructorFor(BoundReference(i, f.dataType, f.nullable), f.dataType) ) } -CreateRow(fields) +CreateExternalRow(fields) } private def constructorFor(input: Expression, dataType: DataType): Expression = dataType match { @@ -195,7 +198,7 @@ object RowEncoder { Literal.create(null, externalDataTypeFor(f.dataType)), constructorFor(getField(input, i, f.dataType), f.dataType)) } - CreateRow(convertedFields) + CreateExternalRow(convertedFields) } private def getField( http://git-wip-us.apache.org/repos/asf/spark/blob/42d225f4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala index 8fc00ad..b42d6c5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala @@ -456,7 +456,13 @@ case class MapObjects( } } -case class CreateRow(children: Seq[Expression]) extends Expression {
spark git commit: [SPARK-11232][CORE] Use 'offer' instead of 'put' to make sure calling send won't be interrupted
Repository: spark Updated Branches: refs/heads/master 42d225f44 -> 7bb6d31cf [SPARK-11232][CORE] Use 'offer' instead of 'put' to make sure calling send won't be interrupted The current `NettyRpcEndpointRef.send` can be interrupted because it uses `LinkedBlockingQueue.put`, which may hang the application. Image the following execution order: | thread 1: TaskRunner.kill | thread 2: TaskRunner.run - | - | - 1 | killed = true | 2 | | if (killed) { 3 | | throw new TaskKilledException 4 | | case _: TaskKilledException _: InterruptedException if task.killed => 5 | task.kill(interruptThread): interruptThread is true | 6 | | execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled)) 7 | | localEndpoint.send(StatusUpdate(taskId, state, serializedData)): in LocalBackend Then `localEndpoint.send(StatusUpdate(taskId, state, serializedData))` will throw `InterruptedException`. This will prevent the executor from updating the task status and hang the application. An failure caused by the above issue here: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44062/consoleFull Since `receivers` is an unbounded `LinkedBlockingQueue`, we can just use `LinkedBlockingQueue.offer` to resolve this issue. Author: zsxwingCloses #9198 from zsxwing/dont-interrupt-send. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7bb6d31c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7bb6d31c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7bb6d31c Branch: refs/heads/master Commit: 7bb6d31cff279776f90744407291682774cfe1c2 Parents: 42d225f Author: zsxwing Authored: Thu Oct 22 11:31:47 2015 -0700 Committer: Marcelo Vanzin Committed: Thu Oct 22 11:31:47 2015 -0700 -- .../scala/org/apache/spark/rpc/netty/Dispatcher.scala | 10 +- 1 file changed, 5 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7bb6d31c/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala -- diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala index f1a8273..7bf44a6 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala @@ -66,7 +66,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { } val data = endpoints.get(name) endpointRefs.put(data.endpoint, data.ref) - receivers.put(data) // for the OnStart message + receivers.offer(data) // for the OnStart message } endpointRef } @@ -80,7 +80,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { val data = endpoints.remove(name) if (data != null) { data.inbox.stop() - receivers.put(data) // for the OnStop message + receivers.offer(data) // for the OnStop message } // Don't clean `endpointRefs` here because it's possible that some messages are being processed // now and they can use `getRpcEndpointRef`. So `endpointRefs` will be cleaned in Inbox via @@ -163,7 +163,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { true } else { data.inbox.post(createMessageFn(data.ref)) -receivers.put(data) +receivers.offer(data) false } } @@ -183,7 +183,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { // Stop all endpoints. This will queue all endpoints for processing by the message loops. endpoints.keySet().asScala.foreach(unregisterRpcEndpoint) // Enqueue a message that tells the message loops to stop. -receivers.put(PoisonPill) +receivers.offer(PoisonPill) threadpool.shutdown() } @@ -218,7 +218,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { val data = receivers.take() if (data == PoisonPill) { // Put PoisonPill back so that other MessageLoops can see it. - receivers.put(PoisonPill) + receivers.offer(PoisonPill) return } data.inbox.process(Dispatcher.this) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11121][CORE] Correct the TaskLocation type
Repository: spark Updated Branches: refs/heads/master 1d9733271 -> c03b6d115 [SPARK-11121][CORE] Correct the TaskLocation type Correct the logic to return `HDFSCacheTaskLocation` instance when the input `str` is a in memory location. Author: zhichao.liCloses #9096 from zhichao-li/uselessBranch. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c03b6d11 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c03b6d11 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c03b6d11 Branch: refs/heads/master Commit: c03b6d11589102b91f08728519e8520025db91e1 Parents: 1d97332 Author: zhichao.li Authored: Thu Oct 22 03:59:26 2015 -0700 Committer: Sean Owen Committed: Thu Oct 22 03:59:26 2015 -0700 -- .../scala/org/apache/spark/scheduler/TaskLocation.scala | 2 +- .../org/apache/spark/scheduler/TaskSetManagerSuite.scala | 11 --- 2 files changed, 9 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c03b6d11/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala index da07ce2..1b65926 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala @@ -67,7 +67,7 @@ private[spark] object TaskLocation { if (hstr.equals(str)) { new HostTaskLocation(str) } else { - new HostTaskLocation(hstr) + new HDFSCacheTaskLocation(hstr) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/c03b6d11/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index f0eadf2..695523c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -759,9 +759,9 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3")) val taskSet = FakeTask.createTaskSet(3, - Seq(HostTaskLocation("host1")), - Seq(HostTaskLocation("host2")), - Seq(HDFSCacheTaskLocation("host3"))) + Seq(TaskLocation("host1")), + Seq(TaskLocation("host2")), + Seq(TaskLocation("hdfs_cache_host3"))) val clock = new ManualClock val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY))) @@ -776,6 +776,11 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(manager.myLocalityLevels.sameElements(Array(ANY))) } + test("Test TaskLocation for different host type.") { +assert(TaskLocation("host1") === HostTaskLocation("host1")) +assert(TaskLocation("hdfs_cache_host1") === HDFSCacheTaskLocation("host1")) + } + def createTaskResult(id: Int): DirectTaskResult[Int] = { val valueSer = SparkEnv.get.serializer.newInstance() new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9735][SQL] Respect the user specified schema than the infer partition schema for HadoopFsRelation
Repository: spark Updated Branches: refs/heads/master 3535b91dd -> d4950e6be [SPARK-9735][SQL] Respect the user specified schema than the infer partition schema for HadoopFsRelation To enable the unit test of `hadoopFsRelationSuite.Partition column type casting`. It previously threw exception like below, as we treat the auto infer partition schema with higher priority than the user specified one. ``` java.lang.ClassCastException: java.lang.Integer cannot be cast to org.apache.spark.unsafe.types.UTF8String at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:45) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:220) at org.apache.spark.sql.catalyst.expressions.JoinedRow.getUTF8String(JoinedRow.scala:102) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:62) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$9.apply(DataSourceStrategy.scala:212) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$9.apply(DataSourceStrategy.scala:212) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:903) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:903) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1846) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1846) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 07:44:01.344 ERROR org.apache.spark.executor.Executor: Exception in task 14.0 in stage 3.0 (TID 206) java.lang.ClassCastException: java.lang.Integer cannot be cast to org.apache.spark.unsafe.types.UTF8String at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:45) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:220) at org.apache.spark.sql.catalyst.expressions.JoinedRow.getUTF8String(JoinedRow.scala:102) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:62) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$9.apply(DataSourceStrategy.scala:212) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$9.apply(DataSourceStrategy.scala:212) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
spark git commit: [SPARK-11116][SQL] First Draft of Dataset API
Repository: spark Updated Branches: refs/heads/master 188ea348f -> 53e83a3a7 [SPARK-6][SQL] First Draft of Dataset API *This PR adds a new experimental API to Spark, tentitively named Datasets.* A `Dataset` is a strongly-typed collection of objects that can be transformed in parallel using functional or relational operations. Example usage is as follows: ### Functional ```scala > val ds: Dataset[Int] = Seq(1, 2, 3).toDS() > ds.filter(_ % 1 == 0).collect() res1: Array[Int] = Array(1, 2, 3) ``` ### Relational ```scala scala> ds.toDF().show() +-+ |value| +-+ |1| |2| |3| +-+ > ds.select(expr("value + 1").as[Int]).collect() res11: Array[Int] = Array(2, 3, 4) ``` ## Comparison to RDDs A `Dataset` differs from an `RDD` in the following ways: - The creation of a `Dataset` requires the presence of an explicit `Encoder` that can be used to serialize the object into a binary format. Encoders are also capable of mapping the schema of a given object to the Spark SQL type system. In contrast, RDDs rely on runtime reflection based serialization. - Internally, a `Dataset` is represented by a Catalyst logical plan and the data is stored in the encoded form. This representation allows for additional logical operations and enables many operations (sorting, shuffling, etc.) to be performed without deserializing to an object. A `Dataset` can be converted to an `RDD` by calling the `.rdd` method. ## Comparison to DataFrames A `Dataset` can be thought of as a specialized DataFrame, where the elements map to a specific JVM object type, instead of to a generic `Row` container. A DataFrame can be transformed into specific Dataset by calling `df.as[ElementType]`. Similarly you can transform a strongly-typed `Dataset` to a generic DataFrame by calling `ds.toDF()`. ## Implementation Status and TODOs This is a rough cut at the least controversial parts of the API. The primary purpose here is to get something committed so that we can better parallelize further work and get early feedback on the API. The following is being deferred to future PRs: - Joins and Aggregations (prototype here https://github.com/apache/spark/commit/f11f91e6f08c8cf389b8388b626cd29eec32d937) - Support for Java Additionally, the responsibility for binding an encoder to a given schema is currently done in a fairly ad-hoc fashion. This is an internal detail, and what we are doing today works for the cases we care about. However, as we add more APIs we'll probably need to do this in a more principled way (i.e. separate resolution from binding as we do in DataFrames). ## COMPATIBILITY NOTE Long term we plan to make `DataFrame` extend `Dataset[Row]`. However, making this change to che class hierarchy would break the function signatures for the existing function operations (map, flatMap, etc). As such, this class should be considered a preview of the final API. Changes will be made to the interface after Spark 1.6. Author: Michael ArmbrustCloses #9190 from marmbrus/dataset-infra. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53e83a3a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53e83a3a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53e83a3a Branch: refs/heads/master Commit: 53e83a3a77cafc2ccd0764ecdb8b3ba735bc51fc Parents: 188ea34 Author: Michael Armbrust Authored: Thu Oct 22 15:20:17 2015 -0700 Committer: Reynold Xin Committed: Thu Oct 22 15:20:17 2015 -0700 -- .../spark/sql/catalyst/ScalaReflection.scala| 8 +- .../sql/catalyst/encoders/ClassEncoder.scala| 38 +- .../spark/sql/catalyst/encoders/Encoder.scala | 19 +- .../sql/catalyst/encoders/ProductEncoder.scala | 12 +- .../sql/catalyst/encoders/primitiveTypes.scala | 100 + .../spark/sql/catalyst/encoders/tuples.scala| 173 .../sql/catalyst/expressions/AttributeMap.scala | 7 + .../sql/catalyst/expressions/AttributeSet.scala | 4 + .../expressions/complexTypeCreator.scala| 8 + .../sql/catalyst/expressions/package.scala | 12 + .../catalyst/plans/logical/basicOperators.scala | 72 +++- .../encoders/PrimitiveEncoderSuite.scala| 43 ++ .../catalyst/encoders/ProductEncoderSuite.scala | 21 +- .../scala/org/apache/spark/sql/Column.scala | 15 + .../scala/org/apache/spark/sql/DataFrame.scala | 11 + .../scala/org/apache/spark/sql/Dataset.scala| 392 +++ .../org/apache/spark/sql/DatasetHolder.scala| 30 ++ .../org/apache/spark/sql/GroupedDataset.scala | 68 .../scala/org/apache/spark/sql/SQLContext.scala | 12 + .../org/apache/spark/sql/SQLImplicits.scala | 16 +- .../spark/sql/execution/GroupedIterator.scala | 141 +++
[2/2] spark git commit: [SPARK-10812] [YARN] Fix shutdown of token renewer.
[SPARK-10812] [YARN] Fix shutdown of token renewer. A recent change to fix the referenced bug caused this exception in the `SparkContext.stop()` path: org.apache.spark.SparkException: YarnSparkHadoopUtil is not available in non-YARN mode! at org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$.get(YarnSparkHadoopUtil.scala:167) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.stop(YarnClientSchedulerBackend.scala:182) at org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:440) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1579) at org.apache.spark.SparkContext$$anonfun$stop$7.apply$mcV$sp(SparkContext.scala:1730) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185) at org.apache.spark.SparkContext.stop(SparkContext.scala:1729) Author: Marcelo VanzinCloses #8996 from vanzin/SPARK-10812. (cherry picked from commit 4b74755122d51edb1257d4f3785fb24508681068) Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e405c2a1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e405c2a1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e405c2a1 Branch: refs/heads/branch-1.5 Commit: e405c2a1f6c75b50324de1bd18363b031d34f3d0 Parents: c49e0c3 Author: Marcelo Vanzin Authored: Wed Oct 7 11:38:07 2015 -0700 Committer: Marcelo Vanzin Committed: Thu Oct 22 13:14:26 2015 -0700 -- .../spark/scheduler/cluster/YarnClientSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e405c2a1/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala -- diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index d06d951..36d5759 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -178,8 +178,8 @@ private[spark] class YarnClientSchedulerBackend( monitorThread.stopMonitor() } super.stop() -client.stop() YarnSparkHadoopUtil.get.stopExecutorDelegationTokenRenewer() +client.stop() logInfo("Stopped") } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[1/2] spark git commit: [SPARK-10812] [YARN] Spark hadoop util support switching to yarn
Repository: spark Updated Branches: refs/heads/branch-1.5 f9ad0e543 -> e405c2a1f [SPARK-10812] [YARN] Spark hadoop util support switching to yarn While this is likely not a huge issue for real production systems, for test systems which may setup a Spark Context and tear it down and stand up a Spark Context with a different master (e.g. some local mode & some yarn mode) tests this cane be an issue. Discovered during work on spark-testing-base on Spark 1.4.1, but seems like the logic that triggers it is present in master (see SparkHadoopUtil object). A valid work around for users encountering this issue is to fork a different JVM, however this can be heavy weight. ``` [info] SampleMiniClusterTest: [info] Exception encountered when attempting to run a suite with class name: com.holdenkarau.spark.testing.SampleMiniClusterTest *** ABORTED *** [info] java.lang.ClassCastException: org.apache.spark.deploy.SparkHadoopUtil cannot be cast to org.apache.spark.deploy.yarn.YarnSparkHadoopUtil [info] at org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$.get(YarnSparkHadoopUtil.scala:163) [info] at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:257) [info] at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:561) [info] at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:115) [info] at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57) [info] at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141) [info] at org.apache.spark.SparkContext.(SparkContext.scala:497) [info] at com.holdenkarau.spark.testing.SharedMiniCluster$class.setup(SharedMiniCluster.scala:186) [info] at com.holdenkarau.spark.testing.SampleMiniClusterTest.setup(SampleMiniClusterTest.scala:26) [info] at com.holdenkarau.spark.testing.SharedMiniCluster$class.beforeAll(SharedMiniCluster.scala:103) ``` Author: Holden KarauCloses #8911 from holdenk/SPARK-10812-spark-hadoop-util-support-switching-to-yarn. (cherry picked from commit d8d50ed388d2e695b69d2b93a620045ef2f0bc18) Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c49e0c3f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c49e0c3f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c49e0c3f Branch: refs/heads/branch-1.5 Commit: c49e0c3f6d25aa15b7cc25db0e9ae5a869184480 Parents: f9ad0e5 Author: Holden Karau Authored: Mon Sep 28 06:33:45 2015 -0700 Committer: Marcelo Vanzin Committed: Thu Oct 22 13:14:21 2015 -0700 -- .../scala/org/apache/spark/SparkContext.scala | 2 ++ .../apache/spark/deploy/SparkHadoopUtil.scala | 30 ++-- .../org/apache/spark/deploy/yarn/Client.scala | 6 +++- .../deploy/yarn/YarnSparkHadoopUtilSuite.scala | 12 4 files changed, 34 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c49e0c3f/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 011e19f..2a2fa75 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1750,6 +1750,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } SparkEnv.set(null) } +// Unset YARN mode system env variable, to allow switching between cluster types. +System.clearProperty("SPARK_YARN_MODE") SparkContext.clearActiveContext() logInfo("Successfully stopped SparkContext") } http://git-wip-us.apache.org/repos/asf/spark/blob/c49e0c3f/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index dda4216..1157ee0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -380,20 +380,13 @@ class SparkHadoopUtil extends Logging { object SparkHadoopUtil { - private val hadoop = { -val yarnMode = java.lang.Boolean.valueOf( -System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))) -if (yarnMode) { - try { -Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil") - .newInstance() - .asInstanceOf[SparkHadoopUtil] - } catch { - case e: Exception => throw new
spark git commit: [SPARK-11242][SQL] In conf/spark-env.sh.template SPARK_DRIVER_MEMORY is documented incorrectly
Repository: spark Updated Branches: refs/heads/master d4950e6be -> 188ea348f [SPARK-11242][SQL] In conf/spark-env.sh.template SPARK_DRIVER_MEMORY is documented incorrectly Minor fix on the comment Author: guoxiCloses #9201 from xguo27/SPARK-11242. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/188ea348 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/188ea348 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/188ea348 Branch: refs/heads/master Commit: 188ea348fdcf877d86f3c433cd15f6468fe3b42a Parents: d4950e6 Author: guoxi Authored: Thu Oct 22 13:56:18 2015 -0700 Committer: Sean Owen Committed: Thu Oct 22 13:56:18 2015 -0700 -- conf/spark-env.sh.template | 8 1 file changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/188ea348/conf/spark-env.sh.template -- diff --git a/conf/spark-env.sh.template b/conf/spark-env.sh.template index 990ded4..771251f 100755 --- a/conf/spark-env.sh.template +++ b/conf/spark-env.sh.template @@ -36,10 +36,10 @@ # Options read in YARN client mode # - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files -# - SPARK_EXECUTOR_INSTANCES, Number of workers to start (Default: 2) -# - SPARK_EXECUTOR_CORES, Number of cores for the workers (Default: 1). -# - SPARK_EXECUTOR_MEMORY, Memory per Worker (e.g. 1000M, 2G) (Default: 1G) -# - SPARK_DRIVER_MEMORY, Memory for Master (e.g. 1000M, 2G) (Default: 1G) +# - SPARK_EXECUTOR_INSTANCES, Number of executors to start (Default: 2) +# - SPARK_EXECUTOR_CORES, Number of cores for the executors (Default: 1). +# - SPARK_EXECUTOR_MEMORY, Memory per Executor (e.g. 1000M, 2G) (Default: 1G) +# - SPARK_DRIVER_MEMORY, Memory for Driver (e.g. 1000M, 2G) (Default: 1G) # - SPARK_YARN_APP_NAME, The name of your application (Default: Spark) # - SPARK_YARN_QUEUE, The hadoop queue to use for allocation requests (Default: âdefaultâ) # - SPARK_YARN_DIST_FILES, Comma separated list of files to be distributed with the job. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-7021] Add JUnit output for Python unit tests
Repository: spark Updated Branches: refs/heads/master 53e83a3a7 -> 163d53e82 [SPARK-7021] Add JUnit output for Python unit tests WIP Author: Gábor LiptákCloses #8323 from gliptak/SPARK-7021. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/163d53e8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/163d53e8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/163d53e8 Branch: refs/heads/master Commit: 163d53e829c166f061589cc379f61642d4c9a40f Parents: 53e83a3 Author: Gábor Lipták Authored: Thu Oct 22 15:27:11 2015 -0700 Committer: Davies Liu Committed: Thu Oct 22 15:27:11 2015 -0700 -- python/pyspark/ml/tests.py| 9 - python/pyspark/mllib/tests.py | 9 - python/pyspark/sql/tests.py | 9 - python/pyspark/streaming/tests.py | 11 ++- python/pyspark/tests.py | 19 ++- 5 files changed, 48 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/163d53e8/python/pyspark/ml/tests.py -- diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 6a2577d..7a16cf5 100644 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -20,6 +20,10 @@ Unit tests for Spark ML Python APIs. """ import sys +try: +import xmlrunner +except ImportError: +xmlrunner = None if sys.version_info[:2] <= (2, 6): try: @@ -368,4 +372,7 @@ class CrossValidatorTests(PySparkTestCase): if __name__ == "__main__": -unittest.main() +if xmlrunner: + unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports')) +else: +unittest.main() http://git-wip-us.apache.org/repos/asf/spark/blob/163d53e8/python/pyspark/mllib/tests.py -- diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 2ad69a0..f8e8e0e 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -31,6 +31,10 @@ from numpy import ( from numpy import sum as array_sum from py4j.protocol import Py4JJavaError +try: +import xmlrunner +except ImportError: +xmlrunner = None if sys.version > '3': basestring = str @@ -1538,7 +1542,10 @@ class MLUtilsTests(MLlibTestCase): if __name__ == "__main__": if not _have_scipy: print("NOTE: Skipping SciPy tests as it does not seem to be installed") -unittest.main() +if xmlrunner: + unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports')) +else: +unittest.main() if not _have_scipy: print("NOTE: SciPy tests were skipped as it does not seem to be installed") sc.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/163d53e8/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index f465e1f..6356d4b 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -31,6 +31,10 @@ import time import datetime import py4j +try: +import xmlrunner +except ImportError: +xmlrunner = None if sys.version_info[:2] <= (2, 6): try: @@ -1222,4 +1226,7 @@ class HiveContextSQLTests(ReusedPySparkTestCase): if __name__ == "__main__": -unittest.main() +if xmlrunner: + unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports')) +else: +unittest.main() http://git-wip-us.apache.org/repos/asf/spark/blob/163d53e8/python/pyspark/streaming/tests.py -- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 4963425..2c908da 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -27,6 +27,11 @@ import struct import shutil from functools import reduce +try: +import xmlrunner +except ImportError: +xmlrunner = None + if sys.version_info[:2] <= (2, 6): try: import unittest2 as unittest @@ -1303,4 +1308,8 @@ if __name__ == "__main__": for testcase in testcases: sys.stderr.write("[Running %s]\n" % (testcase)) tests = unittest.TestLoader().loadTestsFromTestCase(testcase) -unittest.TextTestRunner(verbosity=3).run(tests) +if xmlrunner: +unittest.main(tests, verbosity=3, + testRunner=xmlrunner.XMLTestRunner(output='target/test-reports')) +else: +unittest.TextTestRunner(verbosity=3).run(tests)
spark git commit: [SPARK-11251] Fix page size calculation in local mode
Repository: spark Updated Branches: refs/heads/branch-1.5 e405c2a1f -> a76cf51ed [SPARK-11251] Fix page size calculation in local mode ``` // My machine only has 8 cores $ bin/spark-shell --master local[32] scala> val df = sc.parallelize(Seq((1, 1), (2, 2))).toDF("a", "b") scala> df.as("x").join(df.as("y"), $"x.a" === $"y.a").count() Caused by: java.io.IOException: Unable to acquire 2097152 bytes of memory at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351) ``` Author: Andrew OrCloses #9209 from andrewor14/fix-local-page-size. (cherry picked from commit 34e71c6d89c1f2b6236dbf0d75cd12da08003c84) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a76cf51e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a76cf51e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a76cf51e Branch: refs/heads/branch-1.5 Commit: a76cf51ed91d99c88f301ec85f3cda1288bcf346 Parents: e405c2a Author: Andrew Or Authored: Thu Oct 22 15:58:08 2015 -0700 Committer: Reynold Xin Committed: Thu Oct 22 15:58:17 2015 -0700 -- .../scala/org/apache/spark/SparkContext.scala | 48 ++-- .../main/scala/org/apache/spark/SparkEnv.scala | 4 +- .../OutputCommitCoordinatorSuite.scala | 3 +- 3 files changed, 40 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a76cf51e/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 2a2fa75..a8f6047 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -274,7 +274,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv = { -SparkEnv.createDriverEnv(conf, isLocal, listenerBus) +SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master)) } private[spark] def env: SparkEnv = _env @@ -2548,24 +2548,28 @@ object SparkContext extends Logging { } /** + * The number of driver cores to use for execution in local mode, 0 otherwise. + */ + private[spark] def numDriverCores(master: String): Int = { +def convertToInt(threads: String): Int = { + if (threads == "*") Runtime.getRuntime.availableProcessors() else threads.toInt +} +master match { + case "local" => 1 + case SparkMasterRegex.LOCAL_N_REGEX(threads) => convertToInt(threads) + case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads) + case _ => 0 // driver is not used for execution +} + } + + /** * Create a task scheduler based on a given master URL. * Return a 2-tuple of the scheduler backend and the task scheduler. */ private def createTaskScheduler( sc: SparkContext, master: String): (SchedulerBackend, TaskScheduler) = { -// Regular expression used for local[N] and local[*] master formats -val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r -// Regular expression for local[N, maxRetries], used in tests with failing tasks -val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r -// Regular expression for simulating a Spark cluster of [N, cores, memory] locally -val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r -// Regular expression for connecting to Spark deploy clusters -val SPARK_REGEX = """spark://(.*)""".r -// Regular expression for connection to Mesos cluster by mesos:// or zk:// url -val MESOS_REGEX = """(mesos|zk)://.*""".r -// Regular expression for connection to Simr cluster -val SIMR_REGEX = """simr://(.*)""".r +import SparkMasterRegex._ // When running locally, don't try to re-execute tasks on failure. val MAX_LOCAL_TASK_FAILURES = 1 @@ -2707,6 +2711,24 @@ object SparkContext extends Logging { } /** + * A collection of regexes for extracting information from the master string. + */ +private object SparkMasterRegex { + // Regular expression used for local[N] and local[*] master formats + val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r + // Regular expression for local[N, maxRetries], used in tests with failing tasks + val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r + // Regular expression for simulating a Spark cluster of [N, cores, memory] locally + val
spark git commit: [SPARK-11251] Fix page size calculation in local mode
Repository: spark Updated Branches: refs/heads/master 163d53e82 -> 34e71c6d8 [SPARK-11251] Fix page size calculation in local mode ``` // My machine only has 8 cores $ bin/spark-shell --master local[32] scala> val df = sc.parallelize(Seq((1, 1), (2, 2))).toDF("a", "b") scala> df.as("x").join(df.as("y"), $"x.a" === $"y.a").count() Caused by: java.io.IOException: Unable to acquire 2097152 bytes of memory at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351) ``` Author: Andrew OrCloses #9209 from andrewor14/fix-local-page-size. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/34e71c6d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/34e71c6d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/34e71c6d Branch: refs/heads/master Commit: 34e71c6d89c1f2b6236dbf0d75cd12da08003c84 Parents: 163d53e Author: Andrew Or Authored: Thu Oct 22 15:58:08 2015 -0700 Committer: Reynold Xin Committed: Thu Oct 22 15:58:08 2015 -0700 -- .../scala/org/apache/spark/SparkContext.scala | 48 ++-- .../main/scala/org/apache/spark/SparkEnv.scala | 4 +- .../OutputCommitCoordinatorSuite.scala | 3 +- 3 files changed, 40 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/34e71c6d/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ccba3ed..a6857b4 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -269,7 +269,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv = { -SparkEnv.createDriverEnv(conf, isLocal, listenerBus) +SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master)) } private[spark] def env: SparkEnv = _env @@ -2561,24 +2561,28 @@ object SparkContext extends Logging { } /** + * The number of driver cores to use for execution in local mode, 0 otherwise. + */ + private[spark] def numDriverCores(master: String): Int = { +def convertToInt(threads: String): Int = { + if (threads == "*") Runtime.getRuntime.availableProcessors() else threads.toInt +} +master match { + case "local" => 1 + case SparkMasterRegex.LOCAL_N_REGEX(threads) => convertToInt(threads) + case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads) + case _ => 0 // driver is not used for execution +} + } + + /** * Create a task scheduler based on a given master URL. * Return a 2-tuple of the scheduler backend and the task scheduler. */ private def createTaskScheduler( sc: SparkContext, master: String): (SchedulerBackend, TaskScheduler) = { -// Regular expression used for local[N] and local[*] master formats -val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r -// Regular expression for local[N, maxRetries], used in tests with failing tasks -val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r -// Regular expression for simulating a Spark cluster of [N, cores, memory] locally -val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r -// Regular expression for connecting to Spark deploy clusters -val SPARK_REGEX = """spark://(.*)""".r -// Regular expression for connection to Mesos cluster by mesos:// or zk:// url -val MESOS_REGEX = """(mesos|zk)://.*""".r -// Regular expression for connection to Simr cluster -val SIMR_REGEX = """simr://(.*)""".r +import SparkMasterRegex._ // When running locally, don't try to re-execute tasks on failure. val MAX_LOCAL_TASK_FAILURES = 1 @@ -2720,6 +2724,24 @@ object SparkContext extends Logging { } /** + * A collection of regexes for extracting information from the master string. + */ +private object SparkMasterRegex { + // Regular expression used for local[N] and local[*] master formats + val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r + // Regular expression for local[N, maxRetries], used in tests with failing tasks + val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r + // Regular expression for simulating a Spark cluster of [N, cores, memory] locally + val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r + // Regular expression for
[2/2] spark git commit: Preparing development version 1.5.3-SNAPSHOT
Preparing development version 1.5.3-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be3e3434 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be3e3434 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be3e3434 Branch: refs/heads/branch-1.5 Commit: be3e343453c032e0ae01bcaa5d359de9ef02e950 Parents: ad6ade1 Author: Patrick WendellAuthored: Thu Oct 22 16:02:11 2015 -0700 Committer: Patrick Wendell Committed: Thu Oct 22 16:02:11 2015 -0700 -- assembly/pom.xml| 2 +- bagel/pom.xml | 2 +- core/pom.xml| 2 +- examples/pom.xml| 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml | 2 +- external/kafka-assembly/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt-assembly/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml| 2 +- external/zeromq/pom.xml | 2 +- extras/java8-tests/pom.xml | 2 +- extras/kinesis-asl-assembly/pom.xml | 2 +- extras/kinesis-asl/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- launcher/pom.xml| 2 +- mllib/pom.xml | 2 +- network/common/pom.xml | 2 +- network/shuffle/pom.xml | 2 +- network/yarn/pom.xml| 2 +- pom.xml | 2 +- repl/pom.xml| 2 +- sql/catalyst/pom.xml| 2 +- sql/core/pom.xml| 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml| 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- unsafe/pom.xml | 2 +- yarn/pom.xml| 2 +- 33 files changed, 33 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/be3e3434/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index f0c6c0c..6114f8c 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.2 +1.5.3-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/be3e3434/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index fdbbf9d..dd9eb9e 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.2 +1.5.3-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/be3e3434/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index bdf355f..350aaab 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.2 +1.5.3-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/be3e3434/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index 6b7f72c..0d87f37 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.2 +1.5.3-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/be3e3434/external/flume-assembly/pom.xml -- diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml index 4d43903..3982b3d 100644 --- a/external/flume-assembly/pom.xml +++ b/external/flume-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.2 +1.5.3-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/be3e3434/external/flume-sink/pom.xml -- diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index e3fa0c0..033f222 100644 --- a/external/flume-sink/pom.xml +++ b/external/flume-sink/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.2 +1.5.3-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/be3e3434/external/flume/pom.xml -- diff --git a/external/flume/pom.xml b/external/flume/pom.xml index 700e912..74e9cf4 100644 --- a/external/flume/pom.xml +++ b/external/flume/pom.xml @@ -21,7
[1/2] spark git commit: Preparing Spark release v1.5.2-rc1
Repository: spark Updated Branches: refs/heads/branch-1.5 a76cf51ed -> be3e34345 Preparing Spark release v1.5.2-rc1 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad6ade12 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad6ade12 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad6ade12 Branch: refs/heads/branch-1.5 Commit: ad6ade12412361b5998898e5ce6b1e007fea02eb Parents: a76cf51 Author: Patrick WendellAuthored: Thu Oct 22 16:02:05 2015 -0700 Committer: Patrick Wendell Committed: Thu Oct 22 16:02:05 2015 -0700 -- assembly/pom.xml| 2 +- bagel/pom.xml | 2 +- core/pom.xml| 2 +- examples/pom.xml| 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml | 2 +- external/kafka-assembly/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt-assembly/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml| 2 +- external/zeromq/pom.xml | 2 +- extras/java8-tests/pom.xml | 2 +- extras/kinesis-asl-assembly/pom.xml | 2 +- extras/kinesis-asl/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- launcher/pom.xml| 2 +- mllib/pom.xml | 2 +- network/common/pom.xml | 2 +- network/shuffle/pom.xml | 2 +- network/yarn/pom.xml| 2 +- pom.xml | 2 +- repl/pom.xml| 2 +- sql/catalyst/pom.xml| 2 +- sql/core/pom.xml| 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml| 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- unsafe/pom.xml | 2 +- yarn/pom.xml| 2 +- 33 files changed, 33 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ad6ade12/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 7671ba2..f0c6c0c 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.2-SNAPSHOT +1.5.2 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/ad6ade12/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index 02e920d..fdbbf9d 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.2-SNAPSHOT +1.5.2 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/ad6ade12/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 03d26df..bdf355f 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.2-SNAPSHOT +1.5.2 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/ad6ade12/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index eb1910e..6b7f72c 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.2-SNAPSHOT +1.5.2 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/ad6ade12/external/flume-assembly/pom.xml -- diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml index 0de2f03..4d43903 100644 --- a/external/flume-assembly/pom.xml +++ b/external/flume-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.2-SNAPSHOT +1.5.2 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/ad6ade12/external/flume-sink/pom.xml -- diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index 66ab1b2..e3fa0c0 100644 --- a/external/flume-sink/pom.xml +++ b/external/flume-sink/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.2-SNAPSHOT +1.5.2 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/ad6ade12/external/flume/pom.xml -- diff --git a/external/flume/pom.xml b/external/flume/pom.xml index c058490..700e912
Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.5.2-rc1 [created] ad6ade124 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11098][CORE] Add Outbox to cache the sending messages to resolve the message disorder issue
Repository: spark Updated Branches: refs/heads/master 34e71c6d8 -> a88c66ca8 [SPARK-11098][CORE] Add Outbox to cache the sending messages to resolve the message disorder issue The current NettyRpc has a message order issue because it uses a thread pool to send messages. E.g., running the following two lines in the same thread, ``` ref.send("A") ref.send("B") ``` The remote endpoint may see "B" before "A" because sending "A" and "B" are in parallel. To resolve this issue, this PR added an outbox for each connection, and if we are connecting to the remote node when sending messages, just cache the sending messages in the outbox and send them one by one when the connection is established. Author: zsxwingCloses #9197 from zsxwing/rpc-outbox. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a88c66ca Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a88c66ca Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a88c66ca Branch: refs/heads/master Commit: a88c66ca8780c7228dc909f904d31cd9464ee0e3 Parents: 34e71c6 Author: zsxwing Authored: Thu Oct 22 21:01:01 2015 -0700 Committer: Reynold Xin Committed: Thu Oct 22 21:01:01 2015 -0700 -- .../apache/spark/rpc/netty/NettyRpcEnv.scala| 145 +++- .../org/apache/spark/rpc/netty/Outbox.scala | 222 +++ 2 files changed, 310 insertions(+), 57 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a88c66ca/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala -- diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index e01cf1a..284284e 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -20,6 +20,7 @@ import java.io._ import java.net.{InetSocketAddress, URI} import java.nio.ByteBuffer import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicBoolean import javax.annotation.concurrent.GuardedBy import scala.collection.mutable @@ -70,12 +71,30 @@ private[netty] class NettyRpcEnv( // Because TransportClientFactory.createClient is blocking, we need to run it in this thread pool // to implement non-blocking send/ask. // TODO: a non-blocking TransportClientFactory.createClient in future - private val clientConnectionExecutor = ThreadUtils.newDaemonCachedThreadPool( + private[netty] val clientConnectionExecutor = ThreadUtils.newDaemonCachedThreadPool( "netty-rpc-connection", conf.getInt("spark.rpc.connect.threads", 64)) @volatile private var server: TransportServer = _ + private val stopped = new AtomicBoolean(false) + + /** + * A map for [[RpcAddress]] and [[Outbox]]. When we are connecting to a remote [[RpcAddress]], + * we just put messages to its [[Outbox]] to implement a non-blocking `send` method. + */ + private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]() + + /** + * Remove the address's Outbox and stop it. + */ + private[netty] def removeOutbox(address: RpcAddress): Unit = { +val outbox = outboxes.remove(address) +if (outbox != null) { + outbox.stop() +} + } + def start(port: Int): Unit = { val bootstraps: java.util.List[TransportServerBootstrap] = if (securityManager.isAuthenticationEnabled()) { @@ -116,6 +135,30 @@ private[netty] class NettyRpcEnv( dispatcher.stop(endpointRef) } + private def postToOutbox(address: RpcAddress, message: OutboxMessage): Unit = { +val targetOutbox = { + val outbox = outboxes.get(address) + if (outbox == null) { +val newOutbox = new Outbox(this, address) +val oldOutbox = outboxes.putIfAbsent(address, newOutbox) +if (oldOutbox == null) { + newOutbox +} else { + oldOutbox +} + } else { +outbox + } +} +if (stopped.get) { + // It's possible that we put `targetOutbox` after stopping. So we need to clean it. + outboxes.remove(address) + targetOutbox.stop() +} else { + targetOutbox.send(message) +} + } + private[netty] def send(message: RequestMessage): Unit = { val remoteAddr = message.receiver.address if (remoteAddr == address) { @@ -127,37 +170,28 @@ private[netty] class NettyRpcEnv( val ack = response.asInstanceOf[Ack] logTrace(s"Received ack from ${ack.sender}") case Failure(e) => - logError(s"Exception when sending $message", e) + logWarning(s"Exception when sending $message", e) }(ThreadUtils.sameThread) } else {
spark git commit: [SPARK-11134][CORE] Increase LauncherBackendSuite timeout.
Repository: spark Updated Branches: refs/heads/master a88c66ca8 -> fa6a4fbf0 [SPARK-11134][CORE] Increase LauncherBackendSuite timeout. This test can take a little while to finish on slow / loaded machines. Author: Marcelo VanzinCloses #9235 from vanzin/SPARK-11134. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa6a4fbf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa6a4fbf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa6a4fbf Branch: refs/heads/master Commit: fa6a4fbf08c8cca36cbe9f0d2bd20bc7be2ca45d Parents: a88c66c Author: Marcelo Vanzin Authored: Thu Oct 22 22:41:21 2015 -0700 Committer: Reynold Xin Committed: Thu Oct 22 22:41:21 2015 -0700 -- .../scala/org/apache/spark/launcher/LauncherBackendSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fa6a4fbf/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala b/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala index 07e8869..639d1da 100644 --- a/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala @@ -54,13 +54,13 @@ class LauncherBackendSuite extends SparkFunSuite with Matchers { .startApplication() try { - eventually(timeout(10 seconds), interval(100 millis)) { + eventually(timeout(30 seconds), interval(100 millis)) { handle.getAppId() should not be (null) } handle.stop() - eventually(timeout(10 seconds), interval(100 millis)) { + eventually(timeout(30 seconds), interval(100 millis)) { handle.getState() should be (SparkAppHandle.State.KILLED) } } finally { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Fix a (very tiny) typo
Repository: spark Updated Branches: refs/heads/master fa6a4fbf0 -> b1c1597e3 Fix a (very tiny) typo Author: Jacek LaskowskiCloses #9230 from jaceklaskowski/utils-seconds-typo. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b1c1597e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b1c1597e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b1c1597e Branch: refs/heads/master Commit: b1c1597e3c47f1912809f3c5ab21833fa4241b54 Parents: fa6a4fb Author: Jacek Laskowski Authored: Thu Oct 22 22:42:15 2015 -0700 Committer: Reynold Xin Committed: Thu Oct 22 22:42:15 2015 -0700 -- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b1c1597e/core/src/main/scala/org/apache/spark/util/Utils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 5595040..5a976ee 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -952,7 +952,7 @@ private[spark] object Utils extends Logging { } /** - * Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If + * Convert a time parameter such as (50s, 100ms, or 250us) to seconds for internal use. If * no suffix is provided, the passed number is assumed to be in seconds. */ def timeStringAsSeconds(str: String): Long = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[4/4] spark git commit: [SPARK-10708] Consolidate sort shuffle implementations
[SPARK-10708] Consolidate sort shuffle implementations There's a lot of duplication between SortShuffleManager and UnsafeShuffleManager. Given that these now provide the same set of functionality, now that UnsafeShuffleManager supports large records, I think that we should replace SortShuffleManager's serialized shuffle implementation with UnsafeShuffleManager's and should merge the two managers together. Author: Josh RosenCloses #8829 from JoshRosen/consolidate-sort-shuffle-implementations. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6d06adf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6d06adf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6d06adf Branch: refs/heads/master Commit: f6d06adf05afa9c5386dc2396c94e7a98730289f Parents: 94e2064 Author: Josh Rosen Authored: Thu Oct 22 09:46:30 2015 -0700 Committer: Josh Rosen Committed: Thu Oct 22 09:46:30 2015 -0700 -- .../sort/BypassMergeSortShuffleWriter.java | 106 +++- .../spark/shuffle/sort/PackedRecordPointer.java | 92 +++ .../shuffle/sort/ShuffleExternalSorter.java | 491 .../shuffle/sort/ShuffleInMemorySorter.java | 124 .../shuffle/sort/ShuffleSortDataFormat.java | 67 +++ .../shuffle/sort/SortShuffleFileWriter.java | 53 -- .../apache/spark/shuffle/sort/SpillInfo.java| 37 ++ .../spark/shuffle/sort/UnsafeShuffleWriter.java | 489 .../shuffle/unsafe/PackedRecordPointer.java | 92 --- .../apache/spark/shuffle/unsafe/SpillInfo.java | 37 -- .../unsafe/UnsafeShuffleExternalSorter.java | 479 .../unsafe/UnsafeShuffleInMemorySorter.java | 124 .../unsafe/UnsafeShuffleSortDataFormat.java | 67 --- .../shuffle/unsafe/UnsafeShuffleWriter.java | 489 .../main/scala/org/apache/spark/SparkEnv.scala | 2 +- .../spark/shuffle/sort/SortShuffleManager.scala | 175 +- .../spark/shuffle/sort/SortShuffleWriter.scala | 28 +- .../shuffle/unsafe/UnsafeShuffleManager.scala | 202 --- .../spark/util/collection/ChainedBuffer.scala | 146 - .../spark/util/collection/ExternalSorter.scala | 35 +- .../PartitionedSerializedPairBuffer.scala | 273 - .../shuffle/sort/PackedRecordPointerSuite.java | 102 .../sort/ShuffleInMemorySorterSuite.java| 124 .../shuffle/sort/UnsafeShuffleWriterSuite.java | 560 +++ .../unsafe/PackedRecordPointerSuite.java| 101 .../UnsafeShuffleInMemorySorterSuite.java | 124 .../unsafe/UnsafeShuffleWriterSuite.java| 560 --- .../org/apache/spark/SortShuffleSuite.scala | 65 +++ .../spark/scheduler/DAGSchedulerSuite.scala | 6 +- .../BypassMergeSortShuffleWriterSuite.scala | 64 ++- .../shuffle/sort/SortShuffleManagerSuite.scala | 131 + .../shuffle/sort/SortShuffleWriterSuite.scala | 45 -- .../unsafe/UnsafeShuffleManagerSuite.scala | 129 - .../shuffle/unsafe/UnsafeShuffleSuite.scala | 102 .../util/collection/ChainedBufferSuite.scala| 144 - .../PartitionedSerializedPairBufferSuite.scala | 148 - docs/configuration.md | 7 +- project/MimaExcludes.scala | 9 +- .../apache/spark/sql/execution/Exchange.scala | 23 +- .../execution/UnsafeRowSerializerSuite.scala| 9 +- 40 files changed, 2600 insertions(+), 3461 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java -- diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index f5d80bb..ee82d67 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -21,21 +21,30 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import javax.annotation.Nullable; +import scala.None$; +import scala.Option; import scala.Product2; import scala.Tuple2; import scala.collection.Iterator; +import com.google.common.annotations.VisibleForTesting; import com.google.common.io.Closeables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.spark.Partitioner; +import org.apache.spark.ShuffleDependency; import org.apache.spark.SparkConf; import org.apache.spark.TaskContext; import
[3/4] spark git commit: [SPARK-10708] Consolidate sort shuffle implementations
http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java -- diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java deleted file mode 100644 index e73ba39..000 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java +++ /dev/null @@ -1,479 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.shuffle.unsafe; - -import javax.annotation.Nullable; -import java.io.File; -import java.io.IOException; -import java.util.LinkedList; - -import scala.Tuple2; - -import com.google.common.annotations.VisibleForTesting; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.spark.SparkConf; -import org.apache.spark.TaskContext; -import org.apache.spark.executor.ShuffleWriteMetrics; -import org.apache.spark.serializer.DummySerializerInstance; -import org.apache.spark.serializer.SerializerInstance; -import org.apache.spark.shuffle.ShuffleMemoryManager; -import org.apache.spark.storage.BlockManager; -import org.apache.spark.storage.DiskBlockObjectWriter; -import org.apache.spark.storage.TempShuffleBlockId; -import org.apache.spark.unsafe.Platform; -import org.apache.spark.unsafe.array.ByteArrayMethods; -import org.apache.spark.unsafe.memory.MemoryBlock; -import org.apache.spark.unsafe.memory.TaskMemoryManager; -import org.apache.spark.util.Utils; - -/** - * An external sorter that is specialized for sort-based shuffle. - * - * Incoming records are appended to data pages. When all records have been inserted (or when the - * current thread's shuffle memory limit is reached), the in-memory records are sorted according to - * their partition ids (using a {@link UnsafeShuffleInMemorySorter}). The sorted records are then - * written to a single output file (or multiple files, if we've spilled). The format of the output - * files is the same as the format of the final output file written by - * {@link org.apache.spark.shuffle.sort.SortShuffleWriter}: each output partition's records are - * written as a single serialized, compressed stream that can be read with a new decompression and - * deserialization stream. - * - * Unlike {@link org.apache.spark.util.collection.ExternalSorter}, this sorter does not merge its - * spill files. Instead, this merging is performed in {@link UnsafeShuffleWriter}, which uses a - * specialized merge procedure that avoids extra serialization/deserialization. - */ -final class UnsafeShuffleExternalSorter { - - private final Logger logger = LoggerFactory.getLogger(UnsafeShuffleExternalSorter.class); - - @VisibleForTesting - static final int DISK_WRITE_BUFFER_SIZE = 1024 * 1024; - - private final int initialSize; - private final int numPartitions; - private final int pageSizeBytes; - @VisibleForTesting - final int maxRecordSizeBytes; - private final TaskMemoryManager taskMemoryManager; - private final ShuffleMemoryManager shuffleMemoryManager; - private final BlockManager blockManager; - private final TaskContext taskContext; - private final ShuffleWriteMetrics writeMetrics; - - /** The buffer size to use when writing spills using DiskBlockObjectWriter */ - private final int fileBufferSizeBytes; - - /** - * Memory pages that hold the records being sorted. The pages in this list are freed when - * spilling, although in principle we could recycle these pages across spills (on the other hand, - * this might not be necessary if we maintained a pool of re-usable pages in the TaskMemoryManager - * itself). - */ - private final LinkedList allocatedPages = new LinkedList(); - - private final LinkedList spills = new LinkedList(); - - /** Peak memory used by this sorter so far, in bytes. **/ - private long peakMemoryUsedBytes; - - // These variables are reset after spilling: - @Nullable private UnsafeShuffleInMemorySorter inMemSorter; - @Nullable private MemoryBlock currentPage = null; - private long currentPagePosition = -1; - private long
[2/4] spark git commit: [SPARK-10708] Consolidate sort shuffle implementations
http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala deleted file mode 100644 index 87a786b..000 --- a/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala +++ /dev/null @@ -1,273 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util.collection - -import java.io.InputStream -import java.nio.IntBuffer -import java.util.Comparator - -import org.apache.spark.serializer.{JavaSerializerInstance, SerializerInstance} -import org.apache.spark.storage.DiskBlockObjectWriter -import org.apache.spark.util.collection.PartitionedSerializedPairBuffer._ - -/** - * Append-only buffer of key-value pairs, each with a corresponding partition ID, that serializes - * its records upon insert and stores them as raw bytes. - * - * We use two data-structures to store the contents. The serialized records are stored in a - * ChainedBuffer that can expand gracefully as records are added. This buffer is accompanied by a - * metadata buffer that stores pointers into the data buffer as well as the partition ID of each - * record. Each entry in the metadata buffer takes up a fixed amount of space. - * - * Sorting the collection means swapping entries in the metadata buffer - the record buffer need not - * be modified at all. Storing the partition IDs in the metadata buffer means that comparisons can - * happen without following any pointers, which should minimize cache misses. - * - * Currently, only sorting by partition is supported. - * - * Each record is laid out inside the the metaBuffer as follows. keyStart, a long, is split across - * two integers: - * - * +-+++-+ - * | keyStart | keyValLen | partitionId | - * +-+++-+ - * - * The buffer can support up to `536870911 (2 ^ 29 - 1)` records. - * - * @param metaInitialRecords The initial number of entries in the metadata buffer. - * @param kvBlockSize The size of each byte buffer in the ChainedBuffer used to store the records. - * @param serializerInstance the serializer used for serializing inserted records. - */ -private[spark] class PartitionedSerializedPairBuffer[K, V]( -metaInitialRecords: Int, -kvBlockSize: Int, -serializerInstance: SerializerInstance) - extends WritablePartitionedPairCollection[K, V] with SizeTracker { - - if (serializerInstance.isInstanceOf[JavaSerializerInstance]) { -throw new IllegalArgumentException("PartitionedSerializedPairBuffer does not support" + - " Java-serialized objects.") - } - - require(metaInitialRecords <= MAXIMUM_RECORDS, -s"Can't make capacity bigger than ${MAXIMUM_RECORDS} records") - private var metaBuffer = IntBuffer.allocate(metaInitialRecords * RECORD_SIZE) - - private val kvBuffer: ChainedBuffer = new ChainedBuffer(kvBlockSize) - private val kvOutputStream = new ChainedBufferOutputStream(kvBuffer) - private val kvSerializationStream = serializerInstance.serializeStream(kvOutputStream) - - def insert(partition: Int, key: K, value: V): Unit = { -if (metaBuffer.position == metaBuffer.capacity) { - growMetaBuffer() -} - -val keyStart = kvBuffer.size -kvSerializationStream.writeKey[Any](key) -kvSerializationStream.writeValue[Any](value) -kvSerializationStream.flush() -val keyValLen = (kvBuffer.size - keyStart).toInt - -// keyStart, a long, gets split across two ints -metaBuffer.put(keyStart.toInt) -metaBuffer.put((keyStart >> 32).toInt) -metaBuffer.put(keyValLen) -metaBuffer.put(partition) - } - - /** Double the size of the array because we've reached capacity */ - private def growMetaBuffer(): Unit = { -if (metaBuffer.capacity >= MAXIMUM_META_BUFFER_CAPACITY) { - throw new IllegalStateException(s"Can't insert more than
[1/4] spark git commit: [SPARK-10708] Consolidate sort shuffle implementations
Repository: spark Updated Branches: refs/heads/master 94e2064fa -> f6d06adf0 http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala index 341f56d..b92a302 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala @@ -33,7 +33,8 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark._ import org.apache.spark.executor.{TaskMetrics, ShuffleWriteMetrics} -import org.apache.spark.serializer.{SerializerInstance, Serializer, JavaSerializer} +import org.apache.spark.shuffle.IndexShuffleBlockResolver +import org.apache.spark.serializer.{JavaSerializer, SerializerInstance} import org.apache.spark.storage._ import org.apache.spark.util.Utils @@ -42,25 +43,31 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte @Mock(answer = RETURNS_SMART_NULLS) private var blockManager: BlockManager = _ @Mock(answer = RETURNS_SMART_NULLS) private var diskBlockManager: DiskBlockManager = _ @Mock(answer = RETURNS_SMART_NULLS) private var taskContext: TaskContext = _ + @Mock(answer = RETURNS_SMART_NULLS) private var blockResolver: IndexShuffleBlockResolver = _ + @Mock(answer = RETURNS_SMART_NULLS) private var dependency: ShuffleDependency[Int, Int, Int] = _ private var taskMetrics: TaskMetrics = _ - private var shuffleWriteMetrics: ShuffleWriteMetrics = _ private var tempDir: File = _ private var outputFile: File = _ private val conf: SparkConf = new SparkConf(loadDefaults = false) private val temporaryFilesCreated: mutable.Buffer[File] = new ArrayBuffer[File]() private val blockIdToFileMap: mutable.Map[BlockId, File] = new mutable.HashMap[BlockId, File] - private val shuffleBlockId: ShuffleBlockId = new ShuffleBlockId(0, 0, 0) - private val serializer: Serializer = new JavaSerializer(conf) + private var shuffleHandle: BypassMergeSortShuffleHandle[Int, Int] = _ override def beforeEach(): Unit = { tempDir = Utils.createTempDir() outputFile = File.createTempFile("shuffle", null, tempDir) -shuffleWriteMetrics = new ShuffleWriteMetrics taskMetrics = new TaskMetrics -taskMetrics.shuffleWriteMetrics = Some(shuffleWriteMetrics) MockitoAnnotations.initMocks(this) +shuffleHandle = new BypassMergeSortShuffleHandle[Int, Int]( + shuffleId = 0, + numMaps = 2, + dependency = dependency +) +when(dependency.partitioner).thenReturn(new HashPartitioner(7)) +when(dependency.serializer).thenReturn(Some(new JavaSerializer(conf))) when(taskContext.taskMetrics()).thenReturn(taskMetrics) +when(blockResolver.getDataFile(0, 0)).thenReturn(outputFile) when(blockManager.diskBlockManager).thenReturn(diskBlockManager) when(blockManager.getDiskWriter( any[BlockId], @@ -107,18 +114,20 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte test("write empty iterator") { val writer = new BypassMergeSortShuffleWriter[Int, Int]( - new SparkConf(loadDefaults = false), blockManager, - new HashPartitioner(7), - shuffleWriteMetrics, - serializer + blockResolver, + shuffleHandle, + 0, // MapId + taskContext, + conf ) -writer.insertAll(Iterator.empty) -val partitionLengths = writer.writePartitionedFile(shuffleBlockId, taskContext, outputFile) -assert(partitionLengths.sum === 0) +writer.write(Iterator.empty) +writer.stop( /* success = */ true) +assert(writer.getPartitionLengths.sum === 0) assert(outputFile.exists()) assert(outputFile.length() === 0) assert(temporaryFilesCreated.isEmpty) +val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics.get assert(shuffleWriteMetrics.shuffleBytesWritten === 0) assert(shuffleWriteMetrics.shuffleRecordsWritten === 0) assert(taskMetrics.diskBytesSpilled === 0) @@ -129,17 +138,19 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte def records: Iterator[(Int, Int)] = Iterator((1, 1), (5, 5)) ++ (0 until 10).iterator.map(x => (2, 2)) val writer = new BypassMergeSortShuffleWriter[Int, Int]( - new SparkConf(loadDefaults = false), blockManager, - new HashPartitioner(7), - shuffleWriteMetrics, - serializer + blockResolver, + shuffleHandle, + 0, // MapId + taskContext, + conf ) -writer.insertAll(records) +writer.write(records) +writer.stop( /* success = */ true)