[GitHub] spark pull request #20480: [Spark-23306] Fix the oom caused by contention

2018-02-01 Thread zhzhan
GitHub user zhzhan opened a pull request:

https://github.com/apache/spark/pull/20480

[Spark-23306] Fix the oom caused by contention

## What changes were proposed in this pull request?

here is race condition in TaskMemoryManger, which may cause OOM.

The memory released may be taken by another task because there is a gap 
between releaseMemory and acquireMemory, e.g., UnifiedMemoryManager, causing 
the OOM. if the current is the only one that can perform spill. It can happen 
to BytesToBytesMap, as it only spill required bytes.

Loop on current consumer if it still has memory to release.

## How was this patch tested?

The race contention is hard to reproduce, but the current logic seems 
causing the issue.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhzhan/spark oom

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20480.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20480


commit df96f0c126833b0e812cd715ae1538dbd38afac4
Author: Zhan Zhang <zhanzhang@...>
Date:   2018-01-12T19:51:19Z

fix the oom caused by contention




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17180: [SPARK-19839][Core]release longArray in BytesToBytesMap

2017-07-27 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/17180
  
retest it please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17180: [SPARK-19839][Core]release longArray in BytesToBytesMap

2017-07-26 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/17180
  
Will fix the unit test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18694: [SPARK-21492][SQL] Memory leak in SortMergeJoin

2017-07-26 Thread zhzhan
Github user zhzhan closed the pull request at:

https://github.com/apache/spark/pull/18694


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18694: [SPARK-21492][SQL] Memory leak in SortMergeJoin

2017-07-26 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/18694
  
Close the PR and will work on adding close interface for the iterator used 
in SparkSQL to remove extra overhead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17180: [SPARK-19839][Core]release longArray in BytesToBytesMap

2017-07-24 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/17180
  
The test failure us caused by call method on the map after 
`destructiveIterator()` has been called.
It is illegal by the definition. 

https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java#L417

We should remove this test case as it does not follow the restriction. 
Please let me know the feedback.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17180: [SPARK-19839][Core]release longArray in BytesToBytesMap

2017-07-24 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/17180
  
per review comments, release the longArray on destructive iterator creation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18694: [SPARK-21492][SQL] Memory leak in SortMergeJoin

2017-07-21 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/18694
  
Currently the patch helps the scenario such as Join(A, Join(B,C)). It is 
critical for us because we have some internal development in which each stage 
may consists of tens of sort operators. We found each operators takes the 
memory without releasing the current page, and causes a lot of spills. Such 
memory leak becomes critical (ShuffledHashJoin has similar issues and we did 
not hit issues caused by Limit).  

To me, the leak itself is a bug.  If it is agreed that we should fix this 
type of leak, we can find a more elegant way, such as new close() interface, to 
avoid the overhead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18694: [SPARK-21492][SQL] Memory leak in SortMergeJoin

2017-07-21 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/18694
  
If it is assumed that the pipeline is as simple as one stage only has one 
operator need to spill, you are right. But if the pipeline is more complex, for 
example multiple operator needs to spill, this leak can cause serious issue.

A more elegant way is to expose a new interface, e.g., close() for 
RowIterator. If it is agreed, we can implement that and solve the issue without 
overhead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18694: [SPARK-21492][SQL] Memory leak in SortMergeJoin

2017-07-20 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/18694
  
cleanup hook is used after task is done. The diff solve the leak for 
SortMergeJoin only and does not apply to the limit case. Limit is another 
special case and need to be taken care of separately.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18694: [SPARK-21492][SQL] Memory leak in SortMergeJoin

2017-07-20 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18694#discussion_r128683903
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -649,6 +660,11 @@ private[joins] class SortMergeJoinScanner(
   // Initialization (note: do _not_ want to advance streamed here).
   advancedBufferedToRowWithNullFreeJoinKey()
 
+  def destruct(): Unit = {
+ while (streamedIter.advanceNext()) {}
+ while (bufferedIter.advanceNext()) {}
--- End diff --

Detail is explained below.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18694: [SPARK-21492][SQL] Memory leak in SortMergeJoin

2017-07-20 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/18694
  
The memory leak happens on following scenario. For example, in inner join, 
the left side is exhausted, we will stop advance the right side. Because the 
right side is not reach the end, the memory hold will not be released, cannot 
be used by any other operator, for example, UnsafeShuffleWriter, causing more 
spills. 

Will locate the code logic in UnsafeExternalSorter that prevent the memory 
being released.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18694: [SPARK-21492][SQL] Memory leak in SortMergeJoin

2017-07-20 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18694#discussion_r128679491
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -649,6 +660,11 @@ private[joins] class SortMergeJoinScanner(
   // Initialization (note: do _not_ want to advance streamed here).
   advancedBufferedToRowWithNullFreeJoinKey()
 
+  def destruct(): Unit = {
+ while (streamedIter.advanceNext()) {}
+ while (bufferedIter.advanceNext()) {}
--- End diff --

It does introduce extra overhead. The other way is to introduce a new 
interface for RowIterator to destruct itself. But memory leak is worse than 
extra overhead, because it causes more spill.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18694: [SPARK-21492][SQL] Memory leak in SortMergeJoin

2017-07-20 Thread zhzhan
GitHub user zhzhan opened a pull request:

https://github.com/apache/spark/pull/18694

[SPARK-21492][SQL] Memory leak in SortMergeJoin

## What changes were proposed in this pull request?
Fix the memory in SortMergeJoin

## How was this patch tested?
Relies on existing unit test. Test in production job, and the memory leak 
is fixed by the diff.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhzhan/spark leak

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18694.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18694


commit b8acae2d35c342a117222a3a0cc111f31bd4b4c4
Author: Zhan Zhang <zhanzh...@fb.com>
Date:   2017-07-20T20:25:16Z

fix memory leak on SortMergeJoin




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17180: [SPARK-19839][Core]release longArray in BytesToBy...

2017-03-06 Thread zhzhan
GitHub user zhzhan opened a pull request:

https://github.com/apache/spark/pull/17180

[SPARK-19839][Core]release longArray in BytesToBytesMap

## What changes were proposed in this pull request?
When BytesToBytesMap spills, its longArray should be released. Otherwise, 
it may not released until the task complete. This array may take a significant 
amount of memory, which cannot be used by later operator, such as 
UnsafeShuffleExternalSorter, resulting in more frequent spill in sorter. This 
patch release the array as destructive iterator will not use this array anymore.

## How was this patch tested?
Manual test in production

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhzhan/spark memory

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17180.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17180


commit 562625621f701c77e7755a82c9d9551688f97684
Author: Zhan Zhang <zhanzh...@fb.com>
Date:   2017-03-06T20:02:01Z

release longArray in BytesToBytesMap




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17155: [SPARK-19815][SQL] Not orderable should be applied to ri...

2017-03-03 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/17155
  
@gatorsmile  Thanks for reviewing this. I am thinking the logic again. On 
the surface, the logic may be correct. Since in the join, the left and right 
key should be the same type. Will close the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17155: [SPARK-19815][SQL] Not orderable should be applie...

2017-03-03 Thread zhzhan
Github user zhzhan closed the pull request at:

https://github.com/apache/spark/pull/17155


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17155: [SPARK-19815][SQL] Not order able should be appli...

2017-03-03 Thread zhzhan
GitHub user zhzhan opened a pull request:

https://github.com/apache/spark/pull/17155

[SPARK-19815][SQL] Not order able should be applied to right key instead of 
left key

## What changes were proposed in this pull request?
Change the orderable condition.

## How was this patch tested?
Relies on existing test.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhzhan/spark hashjoin

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17155.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17155


commit 91a61658923418e648bd8960feb6b7e09ef6f915
Author: Zhan Zhang <zhanzh...@fb.com>
Date:   2017-03-03T23:35:03Z

the rightkey should not be orderable instead of left key




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeRowArray...

2017-02-14 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/16909
  
@hvanhovell @davies Correct me if I am wrong. My understanding is that 
following code will go though all matching rows on the right side, and put them 
into the BufferedRowIterator. If there is OOM caused by ArrayList matches in 
SortMergeJoinExec, the memory usage will be doubled in currentRows in 
BufferedRowIterator (assuming the left and right have the same size).

There are two way to solve it. One is to consume one row at a time, and the 
other one is make BufferedRowIterator spoilable (which should be much easier 
based on this PR).


https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L556


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeRowArray...

2017-02-13 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/16909
  
@tejasapatil  Do you want to fix the BufferedRowIterator for 
WholeStageCodegenExec as well? As for inner join, the LinkedList currentRows 
would cause the same issue as it buffer the rows from inner join, and takes 
more memory (probably double if left and right has similar size). Also they can 
share the similar iterator data structure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...

2016-12-08 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16068#discussion_r91570259
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala 
---
@@ -487,6 +489,26 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
 assert(count4 == 1)
 sql("DROP TABLE parquet_tmp")
   }
+
+  test("Hive Stateful UDF") {
+withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) 
{
+  sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS 
'${classOf[StatefulUDF].getName}'")
+  sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS 
'${classOf[StatelessUDF].getName}'")
+  val testData = spark.range(10).repartition(1)
+
+  // Expected Max(s) is 10 as statefulUDF returns the sequence number 
starting from 1.
+  checkAnswer(testData.selectExpr("statefulUDF() as 
s").agg(max($"s")), Row(10))
+
+  // Expected Max(s) is 5 as statefulUDF returns the sequence number 
starting from 1,
+  // and the data is evenly distributed into 2 partitions.
--- End diff --

  case logical.Repartition(numPartitions, shuffle, child) =>
if (shuffle) {
  ShuffleExchange(RoundRobinPartitioning(numPartitions), 
planLater(child)) :: Nil
} else {
  execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
}

  case logical.RepartitionByExpression(expressions, child, nPartitions) 
=>
exchange.ShuffleExchange(HashPartitioning(
  expressions, nPartitions.getOrElse(numPartitions)), 
planLater(child)) :: Nil


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...

2016-12-08 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16068#discussion_r91569919
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala 
---
@@ -487,6 +489,26 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
 assert(count4 == 1)
 sql("DROP TABLE parquet_tmp")
   }
+
+  test("Hive Stateful UDF") {
+withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) 
{
+  sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS 
'${classOf[StatefulUDF].getName}'")
+  sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS 
'${classOf[StatelessUDF].getName}'")
+  val testData = spark.range(10).repartition(1)
+
+  // Expected Max(s) is 10 as statefulUDF returns the sequence number 
starting from 1.
+  checkAnswer(testData.selectExpr("statefulUDF() as 
s").agg(max($"s")), Row(10))
+
+  // Expected Max(s) is 5 as statefulUDF returns the sequence number 
starting from 1,
+  // and the data is evenly distributed into 2 partitions.
--- End diff --

My understanding is that repartition uses RoundRobinPartitioning. 
repartition(2, $"id" < 5) uses hash implementation. In this case, I found all 
rows are shuffled to one partition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...

2016-12-06 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16068#discussion_r91142141
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala 
---
@@ -487,6 +488,52 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
 assert(count4 == 1)
 sql("DROP TABLE parquet_tmp")
   }
+
+  test("Hive Stateful UDF") {
+withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) 
{
+  sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS 
'${classOf[StatefulUDF].getName}'")
+  sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS 
'${classOf[StatelessUDF].getName}'")
+  withTempView("inputTable") {
--- End diff --

The test cannot resolve the function and throw the error if I use:
  test("Hive Stateful UDF") {
withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) {
  sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS 
'${classOf[StatefulUDF].getName}'")
  sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS 
'${classOf[StatelessUDF].getName}'")
  val testData = spark.range(10).repartition(1)
  println(s"start session: $spark")
  val m = testData.select("statefulUDF() as s, ")
  checkAnswer(testData.select("statefulUDF() as s").agg(max($"s")), 
Row(10))
  ...

Do I miss anything, or is it a bug? I will investigate why this happens.

Failed to analyze query: org.apache.spark.sql.AnalysisException: cannot 
resolve '`statefulUDF() as s`' given input columns: [id];;
'Project ['statefulUDF() as s]
+- Repartition 1, true
   +- Range (0, 10, step=1, splits=Some(1))

org.apache.spark.sql.AnalysisException: cannot resolve '`statefulUDF() as 
s`' given input columns: [id];;
'Project ['statefulUDF() as s]
+- Repartition 1, true
   +- Range (0, 10, step=1, splits=Some(1))

at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:74)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:314)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:314)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:313)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:282)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:292)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:296)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:296)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$7.apply(QueryPlan.scala:301)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:192)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:301)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:74)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:132)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:57)
at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48)
at org.apache.spark.sql.Dataset$.ofRows(Da

[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...

2016-12-05 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16068#discussion_r91026585
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala 
---
@@ -487,6 +488,52 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
 assert(count4 == 1)
 sql("DROP TABLE parquet_tmp")
   }
+
+  test("Hive Stateful UDF") {
+withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) 
{
+  sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS 
'${classOf[StatefulUDF].getName}'")
+  sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS 
'${classOf[StatelessUDF].getName}'")
+  withTempView("inputTable") {
+val testData = spark.sparkContext.parallelize(
+  (0 until 10) map (x => IntegerCaseClass(1)), 2).toDF()
+testData.createOrReplaceTempView("inputTable")
+// Distribute all rows to one partition (all rows have the same 
content),
--- End diff --

@cloud-fan  Thanks for the review. Because all rows only contains 
IntegerCaseClass(1), RepartitionByExpression will assign all rows to one 
partition, which has 10 records.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...

2016-12-05 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16068#discussion_r91026433
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala 
---
@@ -487,6 +488,52 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
 assert(count4 == 1)
 sql("DROP TABLE parquet_tmp")
   }
+
+  test("Hive Stateful UDF") {
+withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) 
{
+  sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS 
'${classOf[StatefulUDF].getName}'")
+  sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS 
'${classOf[StatelessUDF].getName}'")
+  withTempView("inputTable") {
+val testData = spark.sparkContext.parallelize(
+  (0 until 10) map (x => IntegerCaseClass(1)), 2).toDF()
+testData.createOrReplaceTempView("inputTable")
+// Distribute all rows to one partition (all rows have the same 
content),
+// and expected Max(s) is 10 as statefulUDF returns the sequence 
number starting from 1.
+checkAnswer(
+  sql(
+"""
+|SELECT MAX(s) FROM
+|  (SELECT statefulUDF() as s FROM
+|(SELECT i from inputTable DISTRIBUTE by i) a
+|) b
+  """.stripMargin),
+  Row(10))
+
+// Expected Max(s) is 5, as there are 2 partitions with 5 rows 
each, and statefulUDF
+// returns the sequence number of the rows in the partition 
starting from 1.
+checkAnswer(
+  sql(
+"""
+  |SELECT MAX(s) FROM
+  |  (SELECT statefulUDF() as s FROM
+  |(SELECT i from inputTable) a
+  |) b
+""".stripMargin),
+  Row(5))
+
+// Expected Max(s) is 1, as stateless UDF is deterministic and 
replaced by constant 1.
--- End diff --

StatelessUDF is foldable:   override def foldable: Boolean = 
isUDFDeterministic && children.forall(_.foldable)

ConstantFolding optimizer will replace it with constant:
  case e if e.foldable => Literal.create(e.eval(EmptyRow), e.dataType)

Here is the explain(true):
== Parsed Logical Plan ==
'Project [unresolvedalias('MAX('s), None)]
+- 'SubqueryAlias b
   +- 'Project ['statelessUDF() AS s#39]
  +- 'SubqueryAlias a
 +- 'RepartitionByExpression ['i]
+- 'Project ['i]
   +- 'UnresolvedRelation `inputTable`

== Analyzed Logical Plan ==
max(s): bigint
Aggregate [max(s#39L) AS max(s)#46L]
+- SubqueryAlias b
   +- Project 
[HiveSimpleUDF#org.apache.spark.sql.hive.execution.StatelessUDF() AS s#39L]
  +- SubqueryAlias a
 +- RepartitionByExpression [i#4]
+- Project [i#4]
   +- SubqueryAlias inputtable
  +- SerializeFromObject 
[assertnotnull(assertnotnull(input[0, 
org.apache.spark.sql.hive.execution.IntegerCaseClass, true], top level Product 
input object), - root class: 
"org.apache.spark.sql.hive.execution.IntegerCaseClass").i AS i#4]
 +- ExternalRDD [obj#3]

== Optimized Logical Plan ==
Aggregate [max(s#39L) AS max(s)#46L]
+- Project [1 AS s#39L]
   +- RepartitionByExpression [i#4]
  +- SerializeFromObject [assertnotnull(assertnotnull(input[0, 
org.apache.spark.sql.hive.execution.IntegerCaseClass, true], top level Product 
input object), - root class: 
"org.apache.spark.sql.hive.execution.IntegerCaseClass").i AS i#4]
 +- ExternalRDD [obj#3]

== Physical Plan ==
*HashAggregate(keys=[], functions=[max(s#39L)], output=[max(s)#46L])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_max(s#39L)], 
output=[max#48L])
  +- *Project [1 AS s#39L]
 +- Exchange hashpartitioning(i#4, 5)
+- *SerializeFromObject [assertnotnull(assertnotnull(input[0, 
org.apache.spark.sql.hive.execution.IntegerCaseClass, true], top level Product 
input object), - root class: 
"org.apache.spark.sql.hive.execution.IntegerCaseClass").i AS i#4]
   +- Scan ExternalRDDScan[obj#3]



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...

2016-12-03 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16068#discussion_r90763121
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala 
---
@@ -487,6 +488,29 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
 assert(count4 == 1)
 sql("DROP TABLE parquet_tmp")
   }
+
+  test("Hive Stateful UDF") {
+sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS 
'${classOf[StatefulUDF].getName}'")
+sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS 
'${classOf[StatelessUDF].getName}'")
+val testData = spark.sparkContext.parallelize(
+  (0 until 10) map(x => IntegerCaseClass(1)), 2).toDF()
+testData.createOrReplaceTempView("inputTable")
+val max1 =
+  sql("SELECT MAX(s) FROM (" +
+"SELECT statefulUDF() as s FROM (SELECT i from inputTable 
DISTRIBUTE by i) a" +
+") b").head().getLong(0)
--- End diff --

will rewrite it after gathering feedback from others.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16068: [SPARK-18637][SQL]Stateful UDF should be considered as n...

2016-12-03 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/16068
  
@gatorsmile  we cannot use deterministic = true/false, as there are 
existing udf with deterministic as true, but stateful as true as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16068: [SPARK-18637][SQL]Stateful UDF should be considered as n...

2016-12-03 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/16068
  
My understanding is that the non-deterministic udf does not need to be 
stageful, but a stateful udf has to be non-deterministic. 

Here is the comments in hive regarding this property

/**
If a UDF stores state based on the sequence of records it has processed, it
is stateful. A stateful UDF cannot be used in certain expressions such as
case statement and certain optimizations such as AND/OR short circuiting
don't apply for such UDFs, as they need to be invoked for each record.
row_sequence is an example of stateful UDF. A stateful UDF is considered to
be non-deterministic, irrespective of what deterministic() returns.
*
@return true
*/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16068: [SPARK-18637][SQL]Stateful UDF should be considered as n...

2016-12-01 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/16068
  
@hvanhovell Would you like take a look and let me know if you have any 
concern?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16068: [SPARK-18637][SQL]Stateful UDF should be considered as n...

2016-11-30 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/16068
  
@hvanhovell Thanks for looking at this. We have a big number of UDFs that 
have this issue. For example, the UDF gives different result with different 
partition/sort, but the UDF is pushdown before the partition/sort, resulting in 
unexpected behavior. I will working on finding some test cases for it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16068: [SPARK-18637][SQL]Stateful UDF should be considered as n...

2016-11-30 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/16068
  
retest it please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16068: stateful udf should be nondeterministic

2016-11-29 Thread zhzhan
GitHub user zhzhan opened a pull request:

https://github.com/apache/spark/pull/16068

stateful udf should be nondeterministic

## What changes were proposed in this pull request?

Make stateful udf as nondeterministic

## How was this patch tested?

Mainly relies on existing queries. We also manually check the queries with 
stateful udf in the filter. Without the patch, the udf is mistakenly pushdown 
for efficiency. After the patch, the physical plan is generated correctly.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhzhan/spark state

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16068.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16068


commit a4d8b4af648e53c355bee16fe371137d0b349331
Author: Zhan Zhang <zhanzh...@fb.com>
Date:   2016-11-29T23:32:45Z

stateful udf should be nondeterministic




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15541: [SPARK-17637][Scheduler]Packed scheduling for Spark task...

2016-11-01 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/15541
  
@rxin Thanks for the feedback regarding the TaskAssigner API. The current 
API is designed based on the current logic of TaskSchedulerImp, where the 
scheduler takes many rounds to assign the tasks for each task set. I have not 
figured out a better way yet. Any suggestions are welcome.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-11-01 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r85985739
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+/** Tracks the current state of the workers with available cores and 
assigned task list. */
+private[scheduler] class OfferState(val workOffer: WorkerOffer) {
+  /** The current remaining cores that can be allocated to tasks. */
+  var coresAvailable: Int = workOffer.cores
+  /** The list of tasks that are assigned to this WorkerOffer. */
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+
+  def assignTask(task: TaskDescription, cpu: Int): Unit = {
+if (coresAvailable < cpu) {
+  throw new SparkException(s"Available cores are less than cpu per 
task" +
+s" ($coresAvailable < $cpu)")
+}
+tasks += task
+coresAvailable -= cpu
+  }
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
when TaskScheduler
+ * performs task assignment given available workers, it first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes multiple rounds to request 
TaskAssigner for task
+ * assignment with different locality restrictions until there is either 
no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows.
+ *
+ * First, TaskScheduler invokes construct() of TaskAssigner to initialize 
the its internal
+ * worker states at the beginning of resource offering.
+ *
+ * Second, before each round of task assignment for a taskset, 
TaskScheduler invokes the init()
+ * of TaskAssigner to initialize the data structure for the round.
+ *
+ * Third, when performing real task assignment, hasNext/next() is used by 
TaskScheduler
+ * to check the worker availability and retrieve current offering from 
TaskAssigner.
+ *
+ * Fourth, TaskScheduler calls offerAccepted() to notify the TaskAssigner 
so that
+ * TaskAssigner can decide whether the current offer is valid or not for 
the next request.
+ *
+ * Fifth, after task assignment is done, TaskScheduler invokes the 
function tasks to
+ * retrieve all the task assignment information.
+ */
+
+private[scheduler] sealed abstract class TaskAssigner {
+  protected var offer: Seq[OfferState] = _
+  protected var cpuPerTask = 1
+
+  protected def withCpuPerTask(cpuPerTask: Int): TaskAssigner = {
+this.cpuPerTask = cpuPerTask
+this
+  }
+
+  /** The currently assigned offers. */
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  /**
+   * Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+   * By default, offers is randomly shuffled to avoid always placing tasks 
on the same set of
+   * workers.
+   */
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = Random.shuffle(workOffer.map(o => new OfferState(o)))
+  }
+
+  /** Invoked at each round of Taskset assignment to initialize th

[GitHub] spark issue #15541: [SPARK-17637][Scheduler]Packed scheduling for Spark task...

2016-10-25 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/15541
  
@rxin Would you like to take a look and let you know if you have any 
concern? Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-23 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84621076
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -250,24 +251,24 @@ private[spark] class TaskSchedulerImpl(
   private def resourceOfferSingleTaskSet(
   taskSet: TaskSetManager,
   maxLocality: TaskLocality,
-  shuffledOffers: Seq[WorkerOffer],
-  availableCpus: Array[Int],
-  tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
+  taskAssigner: TaskAssigner) : Boolean = {
 var launchedTask = false
-for (i <- 0 until shuffledOffers.size) {
-  val execId = shuffledOffers(i).executorId
-  val host = shuffledOffers(i).host
-  if (availableCpus(i) >= CPUS_PER_TASK) {
+taskAssigner.init()
+while (taskAssigner.hasNext) {
+  var isAccepted = false
+  val currentOffer = taskAssigner.next()
+  val execId = currentOffer.workOffer.executorId
+  val host = currentOffer.workOffer.host
+  if (currentOffer.coresAvailable >= CPUS_PER_TASK) {
 try {
   for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
-tasks(i) += task
+currentOffer.assignTask(task, CPUS_PER_TASK)
 val tid = task.taskId
 taskIdToTaskSetManager(tid) = taskSet
 taskIdToExecutorId(tid) = execId
 executorIdToTaskCount(execId) += 1
-availableCpus(i) -= CPUS_PER_TASK
-assert(availableCpus(i) >= 0)
--- End diff --

@viirya The assert will not fail even in the legacy code, because  
taskSet.resourceOffer(execId, host, maxLocality) return an Option. The for loop 
at most run once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-23 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84619879
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -250,24 +251,24 @@ private[spark] class TaskSchedulerImpl(
   private def resourceOfferSingleTaskSet(
   taskSet: TaskSetManager,
   maxLocality: TaskLocality,
-  shuffledOffers: Seq[WorkerOffer],
-  availableCpus: Array[Int],
-  tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
+  taskAssigner: TaskAssigner) : Boolean = {
 var launchedTask = false
-for (i <- 0 until shuffledOffers.size) {
-  val execId = shuffledOffers(i).executorId
-  val host = shuffledOffers(i).host
-  if (availableCpus(i) >= CPUS_PER_TASK) {
+taskAssigner.init()
+while (taskAssigner.hasNext) {
+  var isAccepted = false
+  val currentOffer = taskAssigner.next()
+  val execId = currentOffer.workOffer.executorId
+  val host = currentOffer.workOffer.host
+  if (currentOffer.coresAvailable >= CPUS_PER_TASK) {
 try {
   for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
-tasks(i) += task
+currentOffer.assignTask(task, CPUS_PER_TASK)
 val tid = task.taskId
 taskIdToTaskSetManager(tid) = taskSet
 taskIdToExecutorId(tid) = execId
 executorIdToTaskCount(execId) += 1
-availableCpus(i) -= CPUS_PER_TASK
-assert(availableCpus(i) >= 0)
--- End diff --

Thanks @viirya for the comments. Actually I was thinking removing the check 
although it is part of the legacy code. Now the check is moved into OfferState, 
which makes more sense. IMHO, typically the assertion should never fail. But 
from the OfferState's perspective, it should guarantee such restriction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-23 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84619023
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracks the current state of the workers with available cores and 
assigned task list. */
+private[scheduler] class OfferState(val workOffer: WorkerOffer) {
+  /** The current remaining cores that can be allocated to tasks. */
+  var coresAvailable: Int = workOffer.cores
+  /** The list of tasks that are assigned to this WorkerOffer. */
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+
+  def assignTask(task: TaskDescription, cpu: Int): Unit = {
+tasks += task
+coresAvailable -= cpu
+assert(coresAvailable >= 0)
+  }
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
when TaskScheduler
+ * performs task assignment given available workers, it first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes multiple rounds to request 
TaskAssigner for task
+ * assignment with different locality restrictions until there is either 
no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows.
+ *
+ * First, TaskScheduler invokes construct() of TaskAssigner to initialize 
the its internal
+ * worker states at the beginning of resource offering.
+ *
+ * Second, before each round of task assignment for a taskset, 
TaskScheduler invokes the init()
+ * of TaskAssigner to initialize the data structure for the round.
+ *
+ * Third, when performing real task assignment, hasNext/next() is used by 
TaskScheduler
+ * to check the worker availability and retrieve current offering from 
TaskAssigner.
+ *
+ * Fourth, TaskScheduler calls offerAccepted() to notify the TaskAssigner 
so that
+ * TaskAssigner can decide whether the current offer is valid or not for 
the next request.
+ *
+ * Fifth, after task assignment is done, TaskScheduler invokes the 
function tasks to
+ * retrieve all the task assignment information.
+ */
+
+private[scheduler] sealed abstract class TaskAssigner {
+  protected var offer: Seq[OfferState] = _
+  protected var cpuPerTask = 1
+
+  protected def withCpuPerTask(cpuPerTask: Int): TaskAssigner = {
+this.cpuPerTask = cpuPerTask
--- End diff --

You mean cpuPerTask >= 1? I don't  think we need this check.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-21 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84424034
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracks the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  /** The current remaining cores that can be allocated to tasks. */
+  var coresAvailable: Int = workOffer.cores
+  /** The list of tasks that are assigned to this WorkerOffer. */
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
when TaskScheduler
+ * perform task assignment given available workers, it first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different locality restrictions until there is either 
no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows.
+ *
+ * First, TaskScheduler invokes construct() of TaskAssigner to initialize 
the its internal
+ * worker states at the beginning of resource offering.
+ *
+ * Second, before each round of task assignment for a taskset, 
TaskScheduler invoke the init()
+ * of TaskAssigner to initialize the data structure for the round.
+ *
+ * Third, when performing real task assignment, hasNext()/getNext() is 
used by TaskScheduler
+ * to check the worker availability and retrieve current offering from 
TaskAssigner.
+ *
+ * Fourth, then offerAccepted is used by TaskScheduler to notify the 
TaskAssigner so that
+ * TaskAssigner can decide whether the current offer is valid or not for 
the next request.
+ *
+ * Fifth, After task assignment is done, TaskScheduler invokes the tasks() 
to
+ * retrieve all the task assignment information.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  protected var offer: Seq[OfferState] = _
+  protected var cpuPerTask = 1
+
+  protected def withCpuPerTask(cpuPerTask: Int): Unit = {
+this.cpuPerTask = cpuPerTask
+  }
+
+  /** The final assigned offer returned to TaskScheduler. */
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  /** Invoked at the beginning of resource offering to construct the offer 
with the workoffers. */
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = Random.shuffle(workOffer.map(o => new OfferState(o)))
+  }
+
+  /** Invoked at each round of Taskset assignment to initialize the 
internal structure. */
+  def init(): Unit
+
+  /**
+   * Tests Whether there is offer available to be used inside of one round 
of Taskset assignment.
+   *  @return  `true` if a subsequent call to `next` will yield an element,
+   *   `false` otherwise.
+   */
+  def hasNext: Boolean
+
+  /**
+   * Produces next worker offer based on the task assignment strategy.
+   * @return  the next available offer, if `hasNext` is `true`,
+   *  undef

[GitHub] spark issue #15541: [SPARK-17637][Scheduler]Packed scheduling for Spark task...

2016-10-20 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/15541
  
@gatorsmile I didn't see your new comments 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15541: [SPARK-17637][Scheduler]Packed scheduling for Spark task...

2016-10-20 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/15541
  
@rxin Can you please take a look, and let me know if you have any concern?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-19 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84158910
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAcc

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-19 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84129486
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -109,6 +108,85 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
 assert(!failedTaskSet)
   }
 
+  test("Scheduler does not always schedule tasks on the same workers") {
+val taskScheduler = setupScheduler()
+roundrobin(taskScheduler)
+  }
+
+  test("User can specify the roundrobin task assigner") {
+val taskScheduler = setupScheduler(("spark.scheduler.taskAssigner", 
"RoUndrObin"))
+roundrobin(taskScheduler)
+  }
+
+  test("Fallback to roundrobin when the task assigner provided is not 
valid") {
+val taskScheduler = setupScheduler("spark.scheduler.taskAssigner" -> 
"invalid")
+roundrobin(taskScheduler)
+  }
+
+  test("Scheduler balance the assignment to the worker with more free 
cores") {
--- End diff --

Will change the test case name to make it more explicit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-19 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84119714
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAcc

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-19 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84002685
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAcc

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-19 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84002480
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -109,6 +108,85 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
 assert(!failedTaskSet)
   }
 
+  test("Scheduler does not always schedule tasks on the same workers") {
+val taskScheduler = setupScheduler()
+roundrobin(taskScheduler)
+  }
+
+  test("User can specify the roundrobin task assigner") {
+val taskScheduler = setupScheduler(("spark.scheduler.taskAssigner", 
"RoUndrObin"))
+roundrobin(taskScheduler)
+  }
+
+  test("Fallback to roundrobin when the task assigner provided is not 
valid") {
+val taskScheduler = setupScheduler("spark.scheduler.taskAssigner" -> 
"invalid")
+roundrobin(taskScheduler)
+  }
+
+  test("Scheduler balance the assignment to the worker with more free 
cores") {
--- End diff --

Can you please clarify?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-19 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84002353
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAcc

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-19 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84002236
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAcc

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83999756
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAcc

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83998058
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAcc

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83997070
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAcc

[GitHub] spark issue #15541: [SPARK-17637][Scheduler]Packed scheduling for Spark task...

2016-10-18 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/15541
  
@rxin @gatorsmile Can you please take a look, and kindly provide your 
comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread zhzhan
GitHub user zhzhan opened a pull request:

https://github.com/apache/spark/pull/15541

[SPARK-17637][Scheduler]Packed scheduling for Spark tasks across executors

## What changes were proposed in this pull request?

Restructure the code and implement two new task assigner.
PackedAssigner: try to allocate tasks to the executors with least available 
cores, so that spark can release reserved executors when dynamic allocation is 
enabled.

BalancedAssigner: try to allocate tasks to the executors with more 
available cores in order to balance the workload across all executors.

By default, the original round robin assigner is used.

We test a pipeline, and new PackedAssigner save around 45% regarding the 
reserved cpu and memory with dynamic allocation enabled.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
Both unit test in TaskSchedulerImplSuite and manual tests in production 
pipeline.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhzhan/spark TaskAssigner

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15541.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15541


commit 75cdd1a77a227fa492a09e93794d4ea7be8a020f
Author: Zhan Zhang <zhanzh...@fb.com>
Date:   2016-10-19T01:20:48Z

TaskAssigner to support different scheduling algorithms




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15218: [SPARK-17637][Scheduler]Packed scheduling for Spark task...

2016-10-18 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/15218
  
@wangmiao1981 Thanks for reviewing this. I will open another PR solving 
these comments soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15218: [SPARK-17637][Scheduler]Packed scheduling for Spark task...

2016-10-16 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/15218
  
@rxin Thanks a lot for the detail review. I will update the patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15218: [SPARK-17637][Scheduler]Packed scheduling for Spark task...

2016-10-15 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/15218
  
@mridulm  Thanks for reviewing this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15218: [SPARK-17637][Scheduler]Packed scheduling for Spark task...

2016-10-09 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/15218
  
@mridulm  You are right. This patch is mainly for the job that has multiple 
stages, which is very common in production pipeline. As you mentioned, if there 
is shuffle involved, getLocationsWithLargestOutputs in MapOutputTracker 
typically return None for the ShuffledRowRDD and ShuffledRDD because of the 
threshold REDUCER_PREF_LOCS_FRACTION (20%).

The ShuffledRowRDD/ShuffleRDD can be easily more than 10 partitions (even 
hundreds) in real production pipeline, thus the patch does help a lot in CPU 
reservation time.

 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15218: [SPARK-17637][Scheduler]Packed scheduling for Spark task...

2016-10-07 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/15218
  
@mridulm Thanks for the comments. Your concern regarding the locality is 
right. The patch does not change this behavior, which takes priority of 
locality preference. But if multiple executors satisfying the locality 
restriction, the policy will be applied. In our production pipeline, we do see 
a big gain with respect to reserved cpu resources when dynamic allocation is 
enabled. 

@kayousterhout Would you like take a look and provide your comments?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15218: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-06 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15218#discussion_r82321008
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.SparkConf
+
+case class OfferState(workOffer: WorkerOffer, var cores: Int) {
+  // Build a list of tasks to assign to each worker.
+  val tasks = new ArrayBuffer[TaskDescription](cores)
+}
+
+abstract class TaskAssigner(conf: SparkConf) {
+  var offer: Seq[OfferState] = _
+  val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
+
+  // The final assigned offer returned to TaskScheduler.
+  def tasks(): Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // construct the assigner by the workoffer.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => OfferState(o, o.cores))
+  }
+
+  // Invoked in each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Indicating whether there is offer available to be used by one round 
of Taskset assignment.
+  def hasNext(): Boolean
+
+  // Next available offer returned to one round of Taskset assignment.
+  def getNext(): OfferState
+
+  // Called by the TaskScheduler to indicate whether the current offer is 
accepted
+  // In order to decide whether the current is valid for the next offering.
+  def taskAssigned(assigned: Boolean): Unit
+
+  // Release internally maintained resources. Subclass is responsible to
+  // release its own private resources.
+  def reset: Unit = {
+offer = null
+  }
+}
+
+class RoundRobinAssigner(conf: SparkConf) extends TaskAssigner(conf) {
+  var i = 0
+  override def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = Random.shuffle(workOffer.map(o => OfferState(o, o.cores)))
+  }
+  override def init(): Unit = {
+i = 0
+  }
+  override def hasNext: Boolean = {
+i < offer.size
+  }
+  override def getNext(): OfferState = {
+offer(i)
+  }
+  override def taskAssigned(assigned: Boolean): Unit = {
+i += 1
+  }
+  override def reset: Unit = {
+super.reset
+i = 0
+  }
+}
+
+class BalancedAssigner(conf: SparkConf) extends TaskAssigner(conf) {
--- End diff --

@mridulm Thanks for the comments. But I am lost here. My understanding is 
Ordering-wise, x is equal to y if x.cores == y.cores. This ordering is used by 
priority queue to construct the data structure.  Following is an example from 
trait Ordering. PersonA will be equal to PersionB if they are the same age. Do 
I miss anything?

 * import scala.util.Sorting
  *
  * case class Person(name:String, age:Int)
  * val people = Array(Person("bob", 30), Person("ann", 32), Person("carl", 
19))
  *
  * // sort by age
  * object AgeOrdering extends Ordering[Person] {
  *   def compare(a:Person, b:Person) = a.age compare b.age
  * }
  * Sorting.quickSort(people)(AgeOrdering)
  * }}}


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15218: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-06 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15218#discussion_r82290564
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.SparkConf
+
+case class OfferState(workOffer: WorkerOffer, var cores: Int) {
+  // Build a list of tasks to assign to each worker.
+  val tasks = new ArrayBuffer[TaskDescription](cores)
+}
+
+abstract class TaskAssigner(conf: SparkConf) {
+  var offer: Seq[OfferState] = _
+  val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
+
+  // The final assigned offer returned to TaskScheduler.
+  def tasks(): Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // construct the assigner by the workoffer.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => OfferState(o, o.cores))
+  }
+
+  // Invoked in each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Indicating whether there is offer available to be used by one round 
of Taskset assignment.
+  def hasNext(): Boolean
+
+  // Next available offer returned to one round of Taskset assignment.
+  def getNext(): OfferState
+
+  // Called by the TaskScheduler to indicate whether the current offer is 
accepted
+  // In order to decide whether the current is valid for the next offering.
+  def taskAssigned(assigned: Boolean): Unit
+
+  // Release internally maintained resources. Subclass is responsible to
+  // release its own private resources.
+  def reset: Unit = {
+offer = null
+  }
+}
+
+class RoundRobinAssigner(conf: SparkConf) extends TaskAssigner(conf) {
+  var i = 0
+  override def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = Random.shuffle(workOffer.map(o => OfferState(o, o.cores)))
+  }
+  override def init(): Unit = {
+i = 0
+  }
+  override def hasNext: Boolean = {
+i < offer.size
+  }
+  override def getNext(): OfferState = {
+offer(i)
+  }
+  override def taskAssigned(assigned: Boolean): Unit = {
+i += 1
+  }
+  override def reset: Unit = {
+super.reset
+i = 0
+  }
+}
+
+class BalancedAssigner(conf: SparkConf) extends TaskAssigner(conf) {
--- End diff --

BTW, I don't think need to handle the case of x.cores == y.cores, which 
means they are equal, and depends on the algorithm in priority queue to decide 
the behavior.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15218: [SPARK-17637][Scheduler]Packed scheduling for Spark task...

2016-10-04 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/15218
  
@mridulm Thanks for review this. Will wait for a while in case there are 
more comments before solving it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15218: [SPARK-17637][Scheduler]Packed scheduling for Spark task...

2016-09-23 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/15218
  
@gatorsmile  Thanks. #65832 is the latest one which does not have the same 
failure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15218: [SPARK-17637][Scheduler]Packed scheduling for Spark task...

2016-09-23 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/15218
  
retest please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15218: [SPARK-17637][Scheduler]Packed scheduling for Spark task...

2016-09-23 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/15218
  
Failed in DirectKafkaStreamSuite. It should has nothing to do with the 
patch.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15218: [Spark-17637][Scheduler]Packed scheduling for Spa...

2016-09-23 Thread zhzhan
GitHub user zhzhan opened a pull request:

https://github.com/apache/spark/pull/15218

[Spark-17637][Scheduler]Packed scheduling for Spark tasks across executors

## What changes were proposed in this pull request?

Restructure the code and implement two new task assigner.
PackedAssigner: try to allocate tasks to the executors with least available 
cores, so that spark can release reserved executors when dynamic allocation is 
enabled.

BalancedAssigner: try to allocate tasks to the executors with more 
available cores in order to balance the workload across all executors.

By default, the original round robin assigner is used.

We test a pipeline, and new PackedAssigner  save around 45% regarding the 
reserved cpu and memory with dynamic allocation enabled.


## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
Both unit test in TaskSchedulerImplSuite and manual tests in production 
pipeline.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhzhan/spark packed-scheduler

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15218.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15218


commit 97ee760f8acdacf73e7b8c9a1c65578821efb05c
Author: Zhan Zhang <zhanzh...@fb.com>
Date:   2016-09-18T23:16:22Z

enable multiple task-worker allocation scheduling

commit 3f094cf25a6bb7cb50365d47cd00fb84340d8c6c
Author: Zhan Zhang <zhanzh...@fb.com>
Date:   2016-09-18T23:21:09Z

fix the configuration.md

commit c3ebf9ca84f23d7c150cd1abc69955a7a62678ba
Author: Zhan Zhang <zhanzh...@fb.com>
Date:   2016-09-23T03:23:38Z

formatting and change test cases




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15080: [SPARK-17526][Web UI]: Display the executor log links wi...

2016-09-14 Thread zhzhan
Github user zhzhan commented on the issue:

https://github.com/apache/spark/pull/15080
  
@srowen  Thanks for reviewing this. Any suggestion to improve it are 
welcomed. It does bother us a lot without being able to locate the debug log 
quickly in production.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15080: SPARK-17526: add log links in job failures

2016-09-13 Thread zhzhan
GitHub user zhzhan opened a pull request:

https://github.com/apache/spark/pull/15080

SPARK-17526: add log links in job failures

## What changes were proposed in this pull request?
Add the executor log links with the job failure message on Spark UI and 
Console

## How was this patch tested?

The patch is manually tested. 

In Console:

ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; stdout: 
stdout_log_link stderr: stderr_log_link; aborting job ...

In WebUI 

![screen shot 2016-09-13 at 9 33 06 
am](https://cloud.githubusercontent.com/assets/4451616/18482552/72dd572a-7995-11e6-98be-9324f3750305.png)


(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhzhan/spark debug-log

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15080.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15080


commit 19fa189acfcaadcb1410564d1cb178076ac60406
Author: Zhan Zhang <zhanzh...@fb.com>
Date:   2016-09-13T16:30:45Z

SPARK-17526: add log links in job failures




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15441][SQL] support null object in oute...

2016-05-30 Thread zhzhan
Github user zhzhan commented on the pull request:

https://github.com/apache/spark/pull/13322#issuecomment-222432192
  
My understanding is that this new added hidden column is mainly for serdes 
object to/from row. How would you leverage it to solve the the out join case 
where the null object is actually added during query execution?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-12417. [SQL] Orc bloom filter options ar...

2015-12-18 Thread zhzhan
Github user zhzhan commented on the pull request:

https://github.com/apache/spark/pull/10375#issuecomment-165844369
  
Any test cases to make sure it works as expected? Do you mind changing the 
orc ppd enabled as default or using another JIRA.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11562][SQL] Provide user an option to i...

2015-11-08 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9553#discussion_r44243045
  
--- Diff: repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala 
---
@@ -78,16 +79,21 @@ object Main extends Logging {
 }
 sparkContext = new SparkContext(conf)
 logInfo("Created spark context..")
+sqlContextWithHive = conf.getBoolean("spark.sql.hive.context", true)
 sparkContext
   }
 
   def createSQLContext(): SQLContext = {
-val name = "org.apache.spark.sql.hive.HiveContext"
+val name = sqlContextWithHive match {
+  case true => "org.apache.spark.sql.hive.HiveContext"
+  case false => "org.apache.spark.sql.SQLContext"
+}
+
 val loader = Utils.getContextOrSparkClassLoader
 try {
   sqlContext = 
loader.loadClass(name).getConstructor(classOf[SparkContext])
 .newInstance(sparkContext).asInstanceOf[SQLContext]
-  logInfo("Created sql context (with Hive support)..")
+  logInfo("Created sql context with " + name)
 } catch {
   case _: java.lang.ClassNotFoundException | _: 
java.lang.NoClassDefFoundError =>
 sqlContext = new SQLContext(sparkContext)
--- End diff --

I don't know how to fix this, but the logic seems to be weird if SQLContext 
is used, then in the exception handler the context is created again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11562][SQL] Provide user an option to i...

2015-11-08 Thread zhzhan
Github user zhzhan commented on the pull request:

https://github.com/apache/spark/pull/9553#issuecomment-154943558
  
Need document update for this new configuration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11562][SQL] Provide user an option to i...

2015-11-08 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9553#discussion_r44242983
  
--- Diff: 
repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala ---
@@ -132,6 +132,7 @@ class SparkILoop(
   @DeveloperApi
   var sparkContext: SparkContext = _
   var sqlContext: SQLContext = _
+  var sqlContextWithHive: Boolean = _
--- End diff --

I don't think we need an extra var for this, and we can get the config 
directly since nowhere else will use it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11265] [YARN] YarnClient can't get toke...

2015-10-27 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9232#discussion_r43204781
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala ---
@@ -142,6 +145,97 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
 val containerIdString = 
System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
 ConverterUtils.toContainerId(containerIdString)
   }
+
+  /**
+   * Obtains token for the Hive metastore, using the current user as the 
principal.
+   * Some exceptions are caught and downgraded to a log message.
+   * @param conf hadoop configuration; the Hive configuration will be 
based on this
+   * @return a token, or `None` if there's no need for a token (no 
metastore URI or principal
+   * in the config), or if a binding exception was caught and 
downgraded.
+   */
+  def obtainTokenForHiveMetastore(conf: Configuration): 
Option[Token[DelegationTokenIdentifier]] = {
+try {
+  obtainTokenForHiveMetastoreInner(conf, 
UserGroupInformation.getCurrentUser().getUserName)
+} catch {
+  case e: Exception => {
+handleTokenIntrospectionFailure("Hive", e)
+None
+  }
+}
+  }
+
+  /**
+   * Handle failures to obtain a token through introspection. Failures to 
load the class are
+   * not treated as errors: anything else is.
+   * @param service service name for error messages
+   * @param thrown exception caught
+   * @throws Exception if the `thrown` exception isn't one that is to be 
ignored
+   */
+  private[yarn] def handleTokenIntrospectionFailure(service: String, 
thrown: Throwable): Unit = {
+thrown match {
+  case e: ClassNotFoundException =>
+logInfo(s"$service class not found $e")
+logDebug("Hive Class not found", e)
+  case t: Throwable => {
+throw t
--- End diff --

Here the exception is thrown. I know swallow the exception is bad, but what 
happen if the user does not want to access the hive metastore but want to use 
spark even if token cannot be acquired? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10623] [SQL] Fixes ORC predicate push-d...

2015-09-18 Thread zhzhan
Github user zhzhan commented on the pull request:

https://github.com/apache/spark/pull/8799#issuecomment-141355072
  
LGTM Thanks for fixing this. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10623][SQL]: fix the predicate pushdown...

2015-09-17 Thread zhzhan
Github user zhzhan closed the pull request at:

https://github.com/apache/spark/pull/8783


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10623][SQL]: fix the predicate pushdown...

2015-09-17 Thread zhzhan
Github user zhzhan commented on the pull request:

https://github.com/apache/spark/pull/8783#issuecomment-141215436
  
@liancheng  Thanks for review. Since 
https://github.com/apache/spark/pull/8799 is opened, which also fix another 
issue. I will close this one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10623][SQL]: fix the predicate pushdown...

2015-09-16 Thread zhzhan
GitHub user zhzhan opened a pull request:

https://github.com/apache/spark/pull/8783

[SPARK-10623][SQL]: fix the predicate pushdown construction

The predicate pushdown is not working because the construction is wrong. 
Fix it with startAnd/end

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhzhan/spark SPARK-10623

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/8783.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #8783


commit 6fcc9b1e8c1cac7a020fbb79fd9662b0126804d1
Author: Zhan Zhang <zzhang@hw11188.local>
Date:   2015-09-16T21:44:15Z

SPARK-10623: fix the predicate pushdown construction




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10623][SQL]: fix the predicate pushdown...

2015-09-16 Thread zhzhan
Github user zhzhan commented on the pull request:

https://github.com/apache/spark/pull/8783#issuecomment-140941698
  
@liancheng  @marmbrus  Can you help to review it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10304][SQL]: throw error when the table...

2015-09-01 Thread zhzhan
Github user zhzhan commented on the pull request:

https://github.com/apache/spark/pull/8547#issuecomment-136610247
  
Adding an PartitionValues.empty does not cover all problems. Will close 
this PR, and investigate other approaches.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10304][SQL]: throw error when the table...

2015-09-01 Thread zhzhan
Github user zhzhan closed the pull request at:

https://github.com/apache/spark/pull/8547


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10304][SQL]: throw error when the table...

2015-09-01 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/8547#discussion_r38391122
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala ---
@@ -436,7 +436,8 @@ abstract class HadoopFsRelation 
private[sql](maybePartitionSpec: Option[Partitio
   Try(fs.listStatus(qualified)).getOrElse(Array.empty)
 }.filterNot { status =>
   val name = status.getPath.getName
-  name.toLowerCase == "_temporary" || name.startsWith(".")
+  // Is it safe to replace "_temporary" to "_"?
--- End diff --

Thanks for the comments. It seems there is a lot of corner cases to be 
covered from the test case. for example 1st is valid, but 2nd is not: 
1st:
  "hdfs://host:9000/path/_temporary",
  "hdfs://host:9000/path/a=10/b=20",
  "hdfs://host:9000/path/_temporary/path",
2nd:
  "hdfs://host:9000/path/_temporary",
  "hdfs://host:9000/path/a=10/b=20",
  "hdfs://host:9000/path/path1",
Adding an PartitionValues.empty does not solve the problem. Will close this 
PR, and investigate other approaches.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10304][SQL]: throw error when the table...

2015-08-31 Thread zhzhan
GitHub user zhzhan opened a pull request:

https://github.com/apache/spark/pull/8547

[SPARK-10304][SQL]: throw error when the table directory is invalid

Throw error if the directory of a table is invalid, validated by either all 
files in the directory are partitioned, or none of them are partitioned.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhzhan/spark SPARK-10304

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/8547.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #8547


commit be5522d24a39da20fc8d08b13e15def8e9139cc3
Author: Zhan Zhang <zzhang@hw11188.local>
Date:   2015-09-01T05:01:18Z

SPARK-10304: throw error when the table directory is invalid




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9170][SQL] Instead of StandardStructObj...

2015-08-27 Thread zhzhan
Github user zhzhan commented on the pull request:

https://github.com/apache/spark/pull/7520#issuecomment-135323382
  
@viirya Take a quick second look at the issue. As @chenghao-intel 
mentioned, since  normalizing the name(to lower case) is  the default behavior. 
Should we fix it in the following place(in StructType.scala) instead? Probably 
I am missing something. 
  def apply(name: String): StructField = {
nameToField.getOrElse(name.toLowerCase,
  throw new IllegalArgumentException(sField $name does not 
exist.))
  }


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9170][SQL] Instead of StandardStructObj...

2015-08-27 Thread zhzhan
Github user zhzhan commented on the pull request:

https://github.com/apache/spark/pull/7520#issuecomment-135324209
  
Also we need to change 
  private lazy val nameToField: Map[String, StructField] = fields.map(f = 
f.name.toLowerCase - f).toMap


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9170][SQL] User-provided columns should...

2015-08-27 Thread zhzhan
Github user zhzhan commented on the pull request:

https://github.com/apache/spark/pull/7520#issuecomment-135332239
  
@liancheng have more insights on this part.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9613] [CORE] [WIP] Ban use of JavaConve...

2015-08-17 Thread zhzhan
Github user zhzhan commented on the pull request:

https://github.com/apache/spark/pull/8033#issuecomment-131959546
  
@srowen  It seems that the mapping got messed up, which I don't have clue 
yet and didn't find any obvious reason why the patch can break the test. I will 
dig more and let you know if I have any new findings.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9613] [CORE] [WIP] Ban use of JavaConve...

2015-08-17 Thread zhzhan
Github user zhzhan commented on the pull request:

https://github.com/apache/spark/pull/8033#issuecomment-131960610
  
@srowen Probably you can revert back the change in 
sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9170][SQL] Instead of StandardStructObj...

2015-08-03 Thread zhzhan
Github user zhzhan commented on the pull request:

https://github.com/apache/spark/pull/7520#issuecomment-127465813
  
LGTM. Will let @liancheng take a final look. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9170][SQL] Instead of StandardStructObj...

2015-07-23 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/7520#discussion_r35396110
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala ---
@@ -86,19 +86,10 @@ private[orc] class OrcOutputWriter(
   TypeInfoUtils.getTypeInfoFromTypeString(
 HiveMetastoreTypes.toMetastoreType(dataSchema))
 
-TypeInfoUtils
-  .getStandardJavaObjectInspectorFromTypeInfo(typeInfo)
-  .asInstanceOf[StructObjectInspector]
+OrcStruct.createObjectInspector(typeInfo.asInstanceOf[StructTypeInfo])
+  .asInstanceOf[SettableStructObjectInspector]
--- End diff --

Is it safe to cast to SettableStructObjectInspector, because the function 
signature is 
 ObjectInspector createObjectInspector.  Although the current 
implementation may all return SettableStructObjectInspector.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9170][SQL] Instead of StandardStructObj...

2015-07-23 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/7520#discussion_r35395830
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala ---
@@ -120,15 +111,11 @@ private[orc] class OrcOutputWriter(
   }
 
   override def write(row: Row): Unit = {
-var i = 0
-while (i  row.length) {
-  reusableOutputBuffer(i) = wrappers(i)(row(i))
-  i += 1
-}
+val orcRow = wrap(row, structOI)
--- End diff --

Looks like this call will create a new object for each row written instead 
of reuse reusableOutputBuffer. Is it a concern?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9170][SQL] Instead of StandardStructObj...

2015-07-23 Thread zhzhan
Github user zhzhan commented on the pull request:

https://github.com/apache/spark/pull/7520#issuecomment-124335510
  
LGTM with the comments answered or resolved.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9170][SQL] Instead of StandardStructObj...

2015-07-23 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/7520#discussion_r35337326
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala ---
@@ -85,18 +85,11 @@ private[orc] class OrcOutputWriter(
   TypeInfoUtils.getTypeInfoFromTypeString(
 HiveMetastoreTypes.toMetastoreType(dataSchema))
 
-TypeInfoUtils
-  .getStandardJavaObjectInspectorFromTypeInfo(typeInfo)
-  .asInstanceOf[StructObjectInspector]
+OrcStruct.createObjectInspector(typeInfo.asInstanceOf[StructTypeInfo])
+  .asInstanceOf[SettableStructObjectInspector]
   }
 
-  // Used to hold temporary `Writable` fields of the next row to be 
written.
-  private val reusableOutputBuffer = new Array[Any](dataSchema.length)
-
-  // Used to convert Catalyst values into Hadoop `Writable`s.
-  private val wrappers = structOI.getAllStructFieldRefs.map { ref =
-wrapperFor(ref.getFieldObjectInspector)
-  }.toArray
+  private val allStructFieldRefs = structOI.getAllStructFieldRefs
--- End diff --

I didn't see any usage for this variable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8501] [SQL] Avoids reading schema from ...

2015-07-02 Thread zhzhan
Github user zhzhan commented on the pull request:

https://github.com/apache/spark/pull/7200#issuecomment-118190051
  
some minor comments. Overall, LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8501] [SQL] Avoids reading schema from ...

2015-07-02 Thread zhzhan
Github user zhzhan commented on the pull request:

https://github.com/apache/spark/pull/7200#issuecomment-118187344
  
@liancheng Because in spark, we will not create the orc file if the record 
is empty. It is only happens with the ORC file created by hive, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8501] [SQL] Avoids reading schema from ...

2015-07-02 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/7200#discussion_r33831074
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala ---
@@ -24,30 +24,58 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
 
 import org.apache.spark.Logging
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.hive.HiveMetastoreTypes
 import org.apache.spark.sql.types.StructType
 
-private[orc] object OrcFileOperator extends Logging{
-  def getFileReader(pathStr: String, config: Option[Configuration] = None 
): Reader = {
+private[orc] object OrcFileOperator extends Logging {
+  // TODO Needs to consider all files when schema evolution is taken into 
account.
+  def getFileReader(basePath: String, config: Option[Configuration] = 
None): Option[Reader] = {
+def isWithNonEmptySchema(path: Path, reader: Reader): Boolean = {
+  reader.getObjectInspector match {
+case oi: StructObjectInspector if oi.getAllStructFieldRefs.size() 
 0 =
+  true
+case oi: StructObjectInspector if oi.getAllStructFieldRefs.size() 
== 0 =
+  logInfo(
+sORC file $path has empty schema, it probably contains no 
rows.  +
+  Trying to read another ORC file to figure out the schema.)
+  false
+case _ = false
--- End diff --

In what situation, will the third case happen? If not exist, can we 
collapse the 2nd and 3rd case?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-5111][SQL]HiveContext and Thriftserver ...

2015-06-17 Thread zhzhan
Github user zhzhan commented on the pull request:

https://github.com/apache/spark/pull/4064#issuecomment-112875139
  
@WangTaoTheTonic The problem happens with spark-1.3 and hadoop-2.6 in 
kerberos cluster. With hive-0.14 support, I suppose the problem may be gone, 
but I didn't verify it yet. I will close it since hive-0.14 is supported.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-5111][SQL]HiveContext and Thriftserver ...

2015-06-17 Thread zhzhan
Github user zhzhan closed the pull request at:

https://github.com/apache/spark/pull/4064


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7009] repackaging spark assembly jar wi...

2015-06-16 Thread zhzhan
Github user zhzhan closed the pull request at:

https://github.com/apache/spark/pull/5637


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7009] repackaging spark assembly jar wi...

2015-06-16 Thread zhzhan
Github user zhzhan commented on the pull request:

https://github.com/apache/spark/pull/5637#issuecomment-112615055
  
Close this PR, as it may be outdated with latest spark upstream and not 
working.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6112][Block Manager] Provide external b...

2015-06-01 Thread zhzhan
Github user zhzhan commented on the pull request:

https://github.com/apache/spark/pull/6449#issuecomment-107668164
  
@rxin Can you help reviewing it and let me know your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   >