[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r83991043
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
 ---
@@ -32,13 +32,21 @@ import org.apache.spark.sql.types.StructType
  * @param table the table's (unqualified) name
  * @param partitionSchema the schema of a partitioned table's partition 
columns
  * @param sizeInBytes the table's data size in bytes
+ * @param enableFileStatusCache whether to enable file status caching
  */
 class TableFileCatalog(
 sparkSession: SparkSession,
 db: String,
 table: String,
 partitionSchema: Option[StructType],
-override val sizeInBytes: Long) extends FileCatalog {
+override val sizeInBytes: Long,
+enableFileStatusCache: Boolean) extends FileCatalog {
--- End diff --

when will users want to turn it off?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15481: [SPARK-17929] [CORE] Fix deadlock when CoarseGrainedSche...

2016-10-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15481
  
**[Test build #67155 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67155/consoleFull)**
 for PR 15481 at commit 
[`af6072a`](https://github.com/apache/spark/commit/af6072a9846e418ece86e03c94a1a8da0f0cd928).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15542: [SPARK-17996][SQL] Fix unqualified catalog.getFunction(....

2016-10-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15542
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/67161/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15546: [SPARK-17982][SQL] SQLBuilder should wrap the generated ...

2016-10-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15546
  
**[Test build #67166 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67166/consoleFull)**
 for PR 15546 at commit 
[`2af55c8`](https://github.com/apache/spark/commit/2af55c844a570e7c32af363842562a03c570e5e5).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15543: [SPARK-18001] [Document] fix broke link to SparkDataFram...

2016-10-18 Thread rxin
Github user rxin commented on the issue:

https://github.com/apache/spark/pull/15543
  
Actually I'm guessing it is Tommy Yu


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83995272
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: 

[GitHub] spark issue #15543: [SPARK-18001] [Document] fix broke link to SparkDataFram...

2016-10-18 Thread rxin
Github user rxin commented on the issue:

https://github.com/apache/spark/pull/15543
  
@Wenpei what's your apache jira id? I need that in order to assign the 
ticket to you.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15543: [SPARK-18001] [Document] fix broke link to SparkD...

2016-10-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/15543


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15297: [WIP][SPARK-9862]Handling data skew

2016-10-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15297
  
**[Test build #67162 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67162/consoleFull)**
 for PR 15297 at commit 
[`36624ec`](https://github.com/apache/spark/commit/36624ec94fdc3a2eeeadea1d86cdf2e794198434).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15546: [SPARK-17982][SQL] SQLBuilder should wrap the generated ...

2016-10-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15546
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/67166/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15297: [WIP][SPARK-9862]Handling data skew

2016-10-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15297
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/67162/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15546: [SPARK-17982][SQL] SQLBuilder should wrap the generated ...

2016-10-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15546
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83999827
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean): 

[GitHub] spark issue #15543: [SPARK-18001] [Document] fix broke link to SparkDataFram...

2016-10-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15543
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15545: [SPARK-17999][Kafka][SQL] Add getPreferredLocatio...

2016-10-18 Thread jerryshao
GitHub user jerryshao opened a pull request:

https://github.com/apache/spark/pull/15545

[SPARK-17999][Kafka][SQL] Add getPreferredLocations for KafkaSourceRDD

## What changes were proposed in this pull request?

The newly implemented Structured Streaming `KafkaSource` did calculate the 
preferred locations for each topic partition, but didn't offer this information 
through RDD's `getPreferredLocations` method. So here propose to add this 
method in `KafkaSourceRDD`.

## How was this patch tested?

Manual verification.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jerryshao/apache-spark SPARK-17999

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15545.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15545


commit 1d8622f86d22061eaf1db07404ea1a334faaa266
Author: jerryshao 
Date:   2016-10-19T03:02:02Z

Add getPreferredLocations for KafkaSourceRDD




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15544: [SPARK-17997] [SQL] Add an aggregation function for coun...

2016-10-18 Thread wzhfy
Github user wzhfy commented on the issue:

https://github.com/apache/spark/pull/15544
  
@hvanhovell @yhuai @cloud-fan Can you review this? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2016-10-18 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r83993309
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala
 ---
@@ -142,319 +84,37 @@ case class HyperLogLogPlusPlus(
 
   override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType)
 
-  override def aggBufferSchema: StructType = 
StructType.fromAttributes(aggBufferAttributes)
-
-  /** Allocate enough words to store all registers. */
-  override val aggBufferAttributes: Seq[AttributeReference] = 
Seq.tabulate(numWords) { i =>
-AttributeReference(s"MS[$i]", LongType)()
+  override def createAggregationBuffer(): HyperLogLogPlusPlusAlgo = {
+new HyperLogLogPlusPlusAlgo(relativeSD)
--- End diff --

This, by definition, moves HLL++ operator to the sort based aggregation 
path. This will be a huge performance hit. Is it possible to separate the 
algorithm from the buffer?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15546: [SPARK-17982][SQL] SQLBuilder should wrap the generated ...

2016-10-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15546
  
**[Test build #67165 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67165/consoleFull)**
 for PR 15546 at commit 
[`85c0686`](https://github.com/apache/spark/commit/85c0686a90395f3a7b56f22d78654e83e7ede7a6).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15546: [SPARK-17982][SQL] SQLBuilder should wrap the generated ...

2016-10-18 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/15546
  
Ooops. Thank you for fixing me! @gatorsmile 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2016-10-18 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r83997249
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala
 ---
@@ -142,319 +84,37 @@ case class HyperLogLogPlusPlus(
 
   override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType)
 
-  override def aggBufferSchema: StructType = 
StructType.fromAttributes(aggBufferAttributes)
-
-  /** Allocate enough words to store all registers. */
-  override val aggBufferAttributes: Seq[AttributeReference] = 
Seq.tabulate(numWords) { i =>
-AttributeReference(s"MS[$i]", LongType)()
+  override def createAggregationBuffer(): HyperLogLogPlusPlusAlgo = {
+new HyperLogLogPlusPlusAlgo(relativeSD)
--- End diff --

@hvanhovell Do you mean we put some digest object (like how 
PercentileDigest does in ApproximatePercentile) in the buffer instead of 
putting HLL++ directly in it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83999452
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean): 

[GitHub] spark pull request #15550: [SPARK-18003][Spark Core] Fix bug of RDD zipWithI...

2016-10-18 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15550#discussion_r83999445
  
--- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala ---
@@ -833,6 +833,30 @@ class RDDSuite extends SparkFunSuite with 
SharedSparkContext {
 }
   }
 
+  test("zipWithIndex with partition size exceeding MaxInt") {
+val result = sc.parallelize(Seq(1), 1).mapPartitions(
--- End diff --

all right. I'll update the testcase.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15476: [SPARK-17926][SQL][STREAMING] Added json for statuses

2016-10-18 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/15476
  
lgtm1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15539: [SPARK-17994] [SQL] Add back a file status cache for cat...

2016-10-18 Thread mallman
Github user mallman commented on the issue:

https://github.com/apache/spark/pull/15539
  
@ericl I took this PR for a test drive with some large-ish tables. 
Everything appeared to work as expected.

As far as performance goes, planning a simple select on a partitioned table 
(with cached partition metadata) took less than 1 second up to about 3k 
partitions. It was about 1.5 seconds with 5k partitions, 2 seconds with about 
7.5k partitions and about 3.2 seconds with 10k partitions. Does that align with 
your findings?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15546: [SPARK-17892][SQL] SQLBuilder should wrap the generated ...

2016-10-18 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/15546
  
Wrong JIRA number. : )



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15541: [SPARK-17637][Scheduler]Packed scheduling for Spark task...

2016-10-18 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/15541
  
@rxin @gatorsmile Can you please take a look, and kindly provide your 
comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15549: Bootstrap perf

2016-10-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15549
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83998058
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean): 

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83998771
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean): 

[GitHub] spark pull request #15550: [SPARK-18003][Spark Core] Fix bug of RDD zipWithI...

2016-10-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15550#discussion_r83998765
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala 
---
@@ -64,8 +64,14 @@ class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) 
extends RDD[(T, Long)](prev)
 
   override def compute(splitIn: Partition, context: TaskContext): 
Iterator[(T, Long)] = {
 val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition]
-firstParent[T].iterator(split.prev, context).zipWithIndex.map { x =>
-  (x._1, split.startIndex + x._2)
+val parentIter = firstParent[T].iterator(split.prev, context)
+new Iterator[(T, Long)] {
+  var idxAcc: Long = -1L
--- End diff --

private[this]


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83996049
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: 

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84000957
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: 

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83996692
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: 

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83998150
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: 

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83996326
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: 

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83998358
  
--- Diff: docs/configuration.md ---
@@ -1342,6 +1342,19 @@ Apart from these, the following properties are also 
available, and may be useful
 Should be greater than or equal to 1. Number of allowed retries = this 
value - 1.
   
 
+
+  spark.scheduler.taskAssigner
+  roundrobin
+  
+The strategy of how to allocate tasks among workers with free cores. 
By default, roundrobin
--- End diff --

nit: create a list for each policy and explain inline instead of saying 
`former`, `latter` below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83995649
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
--- End diff --

This feels like something which is set once for object generation and not 
supposed to be mutated later. I am not able to come up with better ways to do 
that. (Maybe pass it as a constructor param ?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83995590
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
--- End diff --

- nit: use `protected`
- nit: Why camel case ? This looks like an regular class var and not a 
constant. I know you want this to be treated as a constant but probably should 
have better guarding against that



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83995463
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
--- End diff --

It might be better to put these into separate points and not as a 
paragraph. Also, I am not sure what the protocol is about putting details like 
method names in the doc. As things stand, it will serve good for people trying 
to read the code but as the codebase evolves, things might get out of sync if 
this comment is not updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83996865
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: 

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83998964
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -250,24 +248,26 @@ private[spark] class TaskSchedulerImpl(
   private def resourceOfferSingleTaskSet(
   taskSet: TaskSetManager,
   maxLocality: TaskLocality,
-  shuffledOffers: Seq[WorkerOffer],
-  availableCpus: Array[Int],
-  tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
+  taskAssigner: TaskAssigner) : Boolean = {
 var launchedTask = false
-for (i <- 0 until shuffledOffers.size) {
-  val execId = shuffledOffers(i).executorId
-  val host = shuffledOffers(i).host
-  if (availableCpus(i) >= CPUS_PER_TASK) {
+taskAssigner.init()
+while(taskAssigner.hasNext) {
--- End diff --

nit: space after `while`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83995119
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
--- End diff --

"this worker" ? `this` wont represent a worker here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83999715
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: 

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83995317
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
--- End diff --

nit: `different the locality restrictions` => `different locality 
restrictions`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15532: [SPARK-17989][SQL] Check ascendingOrder type in sort_arr...

2016-10-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15532
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83997822
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: 

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83998311
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -109,6 +108,85 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
 assert(!failedTaskSet)
   }
 
+  test("Scheduler does not always schedule tasks on the same workers") {
+val taskScheduler = setupScheduler()
+roundrobin(taskScheduler)
+  }
+
+  test("User can specify the roundrobin task assigner") {
+val taskScheduler = setupScheduler(("spark.scheduler.taskAssigner", 
"RoUndrObin"))
+roundrobin(taskScheduler)
+  }
+
+  test("Fallback to roundrobin when the task assigner provided is not 
valid") {
+val taskScheduler = setupScheduler("spark.scheduler.taskAssigner" -> 
"invalid")
+roundrobin(taskScheduler)
+  }
+
+  test("Scheduler balance the assignment to the worker with more free 
cores") {
--- End diff --

I think you should structure this so that tests for the same task assigner 
are packed together. Right now, one has the look into each test case to figure 
out which policy is being tested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15532: [SPARK-17989][SQL] Check ascendingOrder type in sort_arr...

2016-10-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15532
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/67167/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15481: [SPARK-17929] [CORE] Fix deadlock when CoarseGrai...

2016-10-18 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/15481#discussion_r83991614
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -393,7 +393,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 // Remove all the lingering executors that should be removed but not 
yet. The reason might be
 // because (1) disconnected event is not yet received; (2) executors 
die silently.
 executorDataMap.toMap.foreach { case (eid, _) =>
-  driverEndpoint.askWithRetry[Boolean](
+  driverEndpoint.send(
--- End diff --

This will only be invoked in yarn client mode when AM is failed and some 
lingering executors are existed, such situation may not be happened normally as 
I know. So I think it should be OK to call `send`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15544: [SPARK-17997] [SQL] Add an aggregation function for coun...

2016-10-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15544
  
**[Test build #67164 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67164/consoleFull)**
 for PR 15544 at commit 
[`e274ef2`](https://github.com/apache/spark/commit/e274ef22b96ca878df326675e28566cfff6b5088).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15545: [SPARK-17999][Kafka][SQL] Add getPreferredLocations for ...

2016-10-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15545
  
**[Test build #67163 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67163/consoleFull)**
 for PR 15545 at commit 
[`1d8622f`](https://github.com/apache/spark/commit/1d8622f86d22061eaf1db07404ea1a334faaa266).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83995415
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: 

[GitHub] spark pull request #15547: [SPARK-18002][SQL] Pruning unnecessary IsNotNull ...

2016-10-18 Thread viirya
GitHub user viirya opened a pull request:

https://github.com/apache/spark/pull/15547

[SPARK-18002][SQL] Pruning unnecessary IsNotNull predicates from Filter

## What changes were proposed in this pull request?

In `PruneFilters` rule, we can prune unnecessary `IsNotNull` predicates if 
the predicate in `IsNotNull` is not nullable.


## How was this patch tested?

Jenkins tests.

Please review 
https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before 
opening a pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/viirya/spark-1 prune-isnotnull

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15547.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15547


commit a264ebac6924ae7184abe6fc75fa0dfb659b7d54
Author: Liang-Chi Hsieh 
Date:   2016-10-19T04:11:54Z

Pruning unnecessary IsNotNull predicates from Filter.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15548: [SPARK-17985][CORE] Bump commons-lang3 version to...

2016-10-18 Thread ueshin
GitHub user ueshin opened a pull request:

https://github.com/apache/spark/pull/15548

[SPARK-17985][CORE] Bump commons-lang3 version to 3.5.

## What changes were proposed in this pull request?

`SerializationUtils.clone()` of commons-lang3 (<3.5) has a bug that breaks 
thread safety, which gets stack sometimes caused by race condition of 
initializing hash map.
See https://issues.apache.org/jira/browse/LANG-1251.

## How was this patch tested?

Existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ueshin/apache-spark issues/SPARK-17985

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15548.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15548


commit f318dffd4137c20bdc67ac054e345d55703d96de
Author: Takuya UESHIN 
Date:   2016-10-18T02:42:14Z

Bump commons-lang3 version to 3.5.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15546: [SPARK-17982][SQL] SQLBuilder should wrap the generated ...

2016-10-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15546
  
**[Test build #67166 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67166/consoleFull)**
 for PR 15546 at commit 
[`2af55c8`](https://github.com/apache/spark/commit/2af55c844a570e7c32af363842562a03c570e5e5).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15550: [SPARK-18003][Spark Core] Fix bug of RDD zipWithIndex ge...

2016-10-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15550
  
**[Test build #67170 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67170/consoleFull)**
 for PR 15550 at commit 
[`18c3f49`](https://github.com/apache/spark/commit/18c3f4914ca3fb5800dd68ac9d689b6f11cb2e92).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83999756
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean): 

[GitHub] spark issue #15505: [SPARK-17931][CORE] taskScheduler has some unneeded seri...

2016-10-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15505
  
**[Test build #67159 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67159/consoleFull)**
 for PR 15505 at commit 
[`589f3bb`](https://github.com/apache/spark/commit/589f3bba59be5c34a031584c8c9b53b0f48533be).
 * This patch **fails from timeout after a configured wait of \`250m\`**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15505: [SPARK-17931][CORE] taskScheduler has some unneeded seri...

2016-10-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15505
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/67159/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15434: [SPARK-17873][SQL] ALTER TABLE RENAME TO should allow us...

2016-10-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15434
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/67154/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15434: [SPARK-17873][SQL] ALTER TABLE RENAME TO should allow us...

2016-10-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15434
  
**[Test build #67154 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67154/consoleFull)**
 for PR 15434 at commit 
[`65c1885`](https://github.com/apache/spark/commit/65c1885818e4b712c2132e7e97e0b96ceb3f6dd7).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15434: [SPARK-17873][SQL] ALTER TABLE RENAME TO should allow us...

2016-10-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15434
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15471: [SPARK-17919] Make timeout to RBackend configurable in S...

2016-10-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15471
  
**[Test build #67153 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67153/consoleFull)**
 for PR 15471 at commit 
[`fcc3376`](https://github.com/apache/spark/commit/fcc33764259b75f566adf9573cd503cac66d32e0).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15434: [SPARK-17873][SQL] ALTER TABLE RENAME TO should a...

2016-10-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/15434


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15543: [SPARK-18001] [Document] fix broke link to SparkD...

2016-10-18 Thread Wenpei
GitHub user Wenpei opened a pull request:

https://github.com/apache/spark/pull/15543

[SPARK-18001] [Document] fix broke link to SparkDataFrame

## What changes were proposed in this pull request?

In http://spark.apache.org/docs/latest/sql-programming-guide.html, Section 
"Untyped Dataset Operations (aka DataFrame Operations)"

Link to R DataFrame doesn't work that return
The requested URL /docs/latest/api/R/DataFrame.html was not found on this 
server.

Correct link is SparkDataFrame.html for spark 2.0

## How was this patch tested?

Manual checked.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Wenpei/spark spark-18001

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15543.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15543


commit ca18ad54314774af8d9b9eab5a15842602d241b4
Author: Tommy YU 
Date:   2016-10-19T03:20:54Z

fix broke link to SparkDataFrame




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15525: [SPARK-17985][CORE] Bump commons-lang3 version to 3.5.

2016-10-18 Thread rxin
Github user rxin commented on the issue:

https://github.com/apache/spark/pull/15525
  
Ah alright thanks. Can you submit a new pr so I can merge it again?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15543: [SPARK-18001] [Document] fix broke link to SparkDataFram...

2016-10-18 Thread rxin
Github user rxin commented on the issue:

https://github.com/apache/spark/pull/15543
  
Merging in master/branch-2.0. Thanks!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15525: [SPARK-17985][CORE] Bump commons-lang3 version to 3.5.

2016-10-18 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/15525
  
@rxin I submitted a new pr #15548. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15548: [SPARK-17985][CORE] Bump commons-lang3 version to 3.5.

2016-10-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15548
  
**[Test build #67168 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67168/consoleFull)**
 for PR 15548 at commit 
[`f318dff`](https://github.com/apache/spark/commit/f318dffd4137c20bdc67ac054e345d55703d96de).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15547: [SPARK-18002][SQL] Pruning unnecessary IsNotNull predica...

2016-10-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15547
  
**[Test build #67169 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67169/consoleFull)**
 for PR 15547 at commit 
[`a264eba`](https://github.com/apache/spark/commit/a264ebac6924ae7184abe6fc75fa0dfb659b7d54).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83997070
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean): 

[GitHub] spark issue #15534: [SPARK-17917][Scheduler] Fire SparkListenerTasksStarved ...

2016-10-18 Thread rxin
Github user rxin commented on the issue:

https://github.com/apache/spark/pull/15534
  
If a job is submitted and there is no task start update for a while?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15550: [SPARK-18003][Spark Core] Fix bug of RDD zipWithI...

2016-10-18 Thread WeichenXu123
GitHub user WeichenXu123 opened a pull request:

https://github.com/apache/spark/pull/15550

[SPARK-18003][Spark Core] Fix bug of RDD zipWithIndex generating wrong 
result when one partition contains more than 2147483647 records

## What changes were proposed in this pull request?

 Fix bug of RDD zipWithIndex generating wrong result when one partition 
contains more than 2147483647 records.

## How was this patch tested?

test added.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/WeichenXu123/spark 
fix_rdd_zipWithIndex_overflow

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15550.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15550


commit 18c3f4914ca3fb5800dd68ac9d689b6f11cb2e92
Author: WeichenXu 
Date:   2016-10-18T01:59:24Z

update.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15546: [SPARK-17982][SQL] SQLBuilder should wrap the generated ...

2016-10-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15546
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/67165/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15481: [SPARK-17929] [CORE] Fix deadlock when CoarseGrainedSche...

2016-10-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15481
  
**[Test build #67173 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67173/consoleFull)**
 for PR 15481 at commit 
[`7d86054`](https://github.com/apache/spark/commit/7d86054cf97c81abfcbf3737a555bbd91c77e8a4).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15546: [SPARK-17982][SQL] SQLBuilder should wrap the generated ...

2016-10-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15546
  
**[Test build #67172 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67172/consoleFull)**
 for PR 15546 at commit 
[`105c36a`](https://github.com/apache/spark/commit/105c36ab7ee014171f215114e640d27eaf3ece42).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15481: [SPARK-17929] [CORE] Fix deadlock when CoarseGrai...

2016-10-18 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15481#discussion_r83991857
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -393,7 +393,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 // Remove all the lingering executors that should be removed but not 
yet. The reason might be
 // because (1) disconnected event is not yet received; (2) executors 
die silently.
 executorDataMap.toMap.foreach { case (eid, _) =>
-  driverEndpoint.askWithRetry[Boolean](
+  driverEndpoint.send(
--- End diff --

This call is actually just sending the message to the same process. retry 
is not needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15434: [SPARK-17873][SQL] ALTER TABLE RENAME TO should allow us...

2016-10-18 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/15434
  
LGTM. Merging to master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15481: [SPARK-17929] [CORE] Fix deadlock when CoarseGrainedSche...

2016-10-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15481
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/67155/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15481: [SPARK-17929] [CORE] Fix deadlock when CoarseGrainedSche...

2016-10-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15481
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15542: [SPARK-17996][SQL] Fix unqualified catalog.getFunction(....

2016-10-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15542
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15542: [SPARK-17996][SQL] Fix unqualified catalog.getFunction(....

2016-10-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15542
  
**[Test build #67161 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67161/consoleFull)**
 for PR 15542 at commit 
[`6068e44`](https://github.com/apache/spark/commit/6068e44a0ca7128105f95bc438ed958b57c11f55).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15540: [MINOR][Documentation] Mention that short names c...

2016-10-18 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/15540#discussion_r83995294
  
--- Diff: docs/sql-programming-guide.md ---
@@ -421,9 +421,10 @@ In the simplest form, the default data source 
(`parquet` unless otherwise config
 
 You can also manually specify the data source that will be used along with 
any extra options
 that you would like to pass to the data source. Data sources are specified 
by their fully qualified
-name (i.e., `org.apache.spark.sql.parquet`), but for built-in sources you 
can also use their short
-names (`json`, `parquet`, `jdbc`, `orc`, `libsvm`, `csv`, `text`). 
DataFrames loaded from any data
-source type can be converted into other types using this syntax.
+name (i.e., `org.apache.spark.sql.parquet`) or short name specified in
+`DataSourceRegister.shortName()` in their implementation. For built-in 
sources you can use their
--- End diff --

Makes sense. Thank you for your quick response.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15525: [SPARK-17985][CORE] Bump commons-lang3 version to 3.5.

2016-10-18 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/15525
  
I see. I'll submit soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15540: [MINOR][Documentation] Mention that short names c...

2016-10-18 Thread HyukjinKwon
Github user HyukjinKwon closed the pull request at:

https://github.com/apache/spark/pull/15540


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15539: [SPARK-17994] [SQL] Add back a file status cache for cat...

2016-10-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15539
  
**[Test build #67160 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67160/consoleFull)**
 for PR 15539 at commit 
[`b9272c2`](https://github.com/apache/spark/commit/b9272c2a51ab981a6343a43931f57d03e931aab1).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15539: [SPARK-17994] [SQL] Add back a file status cache for cat...

2016-10-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15539
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15539: [SPARK-17994] [SQL] Add back a file status cache for cat...

2016-10-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15539
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/67160/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15297: [WIP][SPARK-9862]Handling data skew

2016-10-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15297
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83998540
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean): 

[GitHub] spark pull request #15546: [SPARK-17892][SQL] SQLBuilder should wrap the gen...

2016-10-18 Thread dongjoon-hyun
GitHub user dongjoon-hyun opened a pull request:

https://github.com/apache/spark/pull/15546

[SPARK-17892][SQL] SQLBuilder should wrap the generated SQL with 
parenthesis for LIMIT

## What changes were proposed in this pull request?

Currently, `SQLBuilder` handles `LIMIT` by simply adding `LIMIT` at the end 
of the generated subSQL. It makes `RuntimeException`s like the following. This 
PR adds a parenthesis to prevent it.

**Before**
```scala
scala> sql("CREATE TABLE tbl(id INT)")
scala> sql("CREATE VIEW v1(id2) AS SELECT id FROM tbl LIMIT 2")
java.lang.RuntimeException: Failed to analyze the canonicalized SQL: ...
```

**After**
```scala
scala> sql("CREATE TABLE tbl(id INT)")
scala> sql("CREATE VIEW v1(id2) AS SELECT id FROM tbl LIMIT 2")
scala> sql("SELECT id2 FROM v1")
res4: org.apache.spark.sql.DataFrame = [id2: int]
```

## How was this patch tested?

Pass the Jenkins test with a newly added test case.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dongjoon-hyun/spark SPARK-17982

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15546.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15546


commit 85c0686a90395f3a7b56f22d78654e83e7ede7a6
Author: Dongjoon Hyun 
Date:   2016-10-19T03:15:48Z

[SPARK-17892][SQL] SQLBuilder should wrap the generated SQL with 
parenthesis for LIMIT




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15544: [SPARK-17997] [SQL] Add an aggregation function for coun...

2016-10-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15544
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15544: [SPARK-17997] [SQL] Add an aggregation function for coun...

2016-10-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15544
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/67164/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15544: [SPARK-17997] [SQL] Add an aggregation function for coun...

2016-10-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15544
  
**[Test build #67164 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67164/consoleFull)**
 for PR 15544 at commit 
[`e274ef2`](https://github.com/apache/spark/commit/e274ef22b96ca878df326675e28566cfff6b5088).
 * This patch **fails MiMa tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15297: [WIP][SPARK-9862]Handling data skew

2016-10-18 Thread YuhuWang2002
Github user YuhuWang2002 commented on the issue:

https://github.com/apache/spark/pull/15297
  
following the review comment, I rewrite code for read a range of 
maps。like this:
class BlockStoreShuffleReader[K, C](
handle: BaseShuffleHandle[K, _, C],
startPartition: Int,
endPartition: Int,
context: TaskContext,
serializerManager: SerializerManager = SparkEnv.get.serializerManager,
blockManager: BlockManager = SparkEnv.get.blockManager,
mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker,
startMapId: Option[Int] = None,
endMapId: Option[Int] = None)

To decide how many range for read from the maps。Use the 
spark.sql.adaptive.skewjoin.threshold value。We think the output size less 
than the skew threshold, It can handling in one task,else we split to many 
task,which every one task handing data size slightly less the skew threshold



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15541: [SPARK-17637][Scheduler]Packed scheduling for Spark task...

2016-10-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15541
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/67157/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15541: [SPARK-17637][Scheduler]Packed scheduling for Spark task...

2016-10-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15541
  
**[Test build #67157 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67157/consoleFull)**
 for PR 15541 at commit 
[`75cdd1a`](https://github.com/apache/spark/commit/75cdd1a77a227fa492a09e93794d4ea7be8a020f).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class OfferState(val workOffer: WorkerOffer) `
  * `class RoundRobinAssigner extends TaskAssigner `
  * `class BalancedAssigner extends TaskAssigner `
  * `class PackedAssigner extends TaskAssigner `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15541: [SPARK-17637][Scheduler]Packed scheduling for Spark task...

2016-10-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15541
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15540: [MINOR][Documentation] Mention that short names c...

2016-10-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15540#discussion_r83994842
  
--- Diff: docs/sql-programming-guide.md ---
@@ -421,9 +421,10 @@ In the simplest form, the default data source 
(`parquet` unless otherwise config
 
 You can also manually specify the data source that will be used along with 
any extra options
 that you would like to pass to the data source. Data sources are specified 
by their fully qualified
-name (i.e., `org.apache.spark.sql.parquet`), but for built-in sources you 
can also use their short
-names (`json`, `parquet`, `jdbc`, `orc`, `libsvm`, `csv`, `text`). 
DataFrames loaded from any data
-source type can be converted into other types using this syntax.
+name (i.e., `org.apache.spark.sql.parquet`) or short name specified in
+`DataSourceRegister.shortName()` in their implementation. For built-in 
sources you can use their
--- End diff --

hm this isn't clear to the end user at all, since there is no class called 
DataSourceRegister even in the source code for data sources. I'd just leave it 
out. It should be the job of the data source to tell users how to use it.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15549: Bootstrap perf

2016-10-18 Thread ahshahid
Github user ahshahid closed the pull request at:

https://github.com/apache/spark/pull/15549


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   >