spark git commit: [SPARK-20464][SS] Add a job group and description for streaming queries and fix cancellation of running jobs using the job group

2017-05-01 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 38edb9256 -> 6f0d29672


[SPARK-20464][SS] Add a job group and description for streaming queries and fix 
cancellation of running jobs using the job group

## What changes were proposed in this pull request?

Job group: adding a job group is required to properly cancel running jobs 
related to a query.
Description: the new description makes it easier to group the batches of a 
query by sorting by name in the Spark Jobs UI.

## How was this patch tested?

- Unit tests
- UI screenshot

  - Order by job id:
![screen shot 2017-04-27 at 5 10 09 
pm](https://cloud.githubusercontent.com/assets/7865120/25509468/15452274-2b6e-11e7-87ba-d929816688cf.png)

  - Order by description:
![screen shot 2017-04-27 at 5 10 22 
pm](https://cloud.githubusercontent.com/assets/7865120/25509474/1c298512-2b6e-11e7-99b8-fef1ef7665c1.png)

  - Order by job id (no query name):
![screen shot 2017-04-27 at 5 21 33 
pm](https://cloud.githubusercontent.com/assets/7865120/25509482/28c96dc8-2b6e-11e7-8df0-9d3cdbb05e36.png)

  - Order by description (no query name):
![screen shot 2017-04-27 at 5 21 44 
pm](https://cloud.githubusercontent.com/assets/7865120/25509489/37674742-2b6e-11e7-9357-b5c38ec16ac4.png)

Author: Kunal Khamar 

Closes #17765 from kunalkhamar/sc-6696.

(cherry picked from commit 6fc6cf88d871f5b05b0ad1a504e0d6213cf9d331)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f0d2967
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f0d2967
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f0d2967

Branch: refs/heads/branch-2.2
Commit: 6f0d29672512bcb720fb82bc92071207dfae5eb1
Parents: 38edb92
Author: Kunal Khamar 
Authored: Mon May 1 11:37:30 2017 -0700
Committer: Shixiong Zhu 
Committed: Mon May 1 11:37:44 2017 -0700

--
 .../scala/org/apache/spark/ui/UIUtils.scala |  2 +-
 .../execution/streaming/StreamExecution.scala   | 12 
 .../spark/sql/streaming/StreamSuite.scala   | 66 
 3 files changed, 79 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6f0d2967/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala 
b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index e53d690..79b0d81 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -446,7 +446,7 @@ private[spark] object UIUtils extends Logging {
   val xml = XML.loadString(s"""$desc""")
 
   // Verify that this has only anchors and span (we are wrapping in span)
-  val allowedNodeLabels = Set("a", "span")
+  val allowedNodeLabels = Set("a", "span", "br")
   val illegalNodes = xml \\ "_"  filterNot { case node: Node =>
 allowedNodeLabels.contains(node.label)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6f0d2967/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index bcf0d97..affc201 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -252,6 +252,8 @@ class StreamExecution(
*/
   private def runBatches(): Unit = {
 try {
+  sparkSession.sparkContext.setJobGroup(runId.toString, 
getBatchDescriptionString,
+interruptOnCancel = true)
   if (sparkSession.sessionState.conf.streamingMetricsEnabled) {
 
sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
   }
@@ -289,6 +291,7 @@ class StreamExecution(
   if (currentBatchId < 0) {
 // We'll do this initialization only once
 populateStartOffsets(sparkSessionToRunBatches)
+
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
 logDebug(s"Stream running from $committedOffsets to 
$availableOffsets")
   } else {
 constructNextBatch()
@@ -308,6 +311,7 @@ class StreamExecution(
   logDebug(s"batch ${currentBatchId} committed")
   // We'll increase currentBatchId after we complete processing 
current batch's data
   currentBatchId += 1
+  

spark git commit: [SPARK-20464][SS] Add a job group and description for streaming queries and fix cancellation of running jobs using the job group

2017-05-01 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master ab30590f4 -> 6fc6cf88d


[SPARK-20464][SS] Add a job group and description for streaming queries and fix 
cancellation of running jobs using the job group

## What changes were proposed in this pull request?

Job group: adding a job group is required to properly cancel running jobs 
related to a query.
Description: the new description makes it easier to group the batches of a 
query by sorting by name in the Spark Jobs UI.

## How was this patch tested?

- Unit tests
- UI screenshot

  - Order by job id:
![screen shot 2017-04-27 at 5 10 09 
pm](https://cloud.githubusercontent.com/assets/7865120/25509468/15452274-2b6e-11e7-87ba-d929816688cf.png)

  - Order by description:
![screen shot 2017-04-27 at 5 10 22 
pm](https://cloud.githubusercontent.com/assets/7865120/25509474/1c298512-2b6e-11e7-99b8-fef1ef7665c1.png)

  - Order by job id (no query name):
![screen shot 2017-04-27 at 5 21 33 
pm](https://cloud.githubusercontent.com/assets/7865120/25509482/28c96dc8-2b6e-11e7-8df0-9d3cdbb05e36.png)

  - Order by description (no query name):
![screen shot 2017-04-27 at 5 21 44 
pm](https://cloud.githubusercontent.com/assets/7865120/25509489/37674742-2b6e-11e7-9357-b5c38ec16ac4.png)

Author: Kunal Khamar 

Closes #17765 from kunalkhamar/sc-6696.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6fc6cf88
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6fc6cf88
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6fc6cf88

Branch: refs/heads/master
Commit: 6fc6cf88d871f5b05b0ad1a504e0d6213cf9d331
Parents: ab30590
Author: Kunal Khamar 
Authored: Mon May 1 11:37:30 2017 -0700
Committer: Shixiong Zhu 
Committed: Mon May 1 11:37:30 2017 -0700

--
 .../scala/org/apache/spark/ui/UIUtils.scala |  2 +-
 .../execution/streaming/StreamExecution.scala   | 12 
 .../spark/sql/streaming/StreamSuite.scala   | 66 
 3 files changed, 79 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6fc6cf88/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala 
b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index e53d690..79b0d81 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -446,7 +446,7 @@ private[spark] object UIUtils extends Logging {
   val xml = XML.loadString(s"""$desc""")
 
   // Verify that this has only anchors and span (we are wrapping in span)
-  val allowedNodeLabels = Set("a", "span")
+  val allowedNodeLabels = Set("a", "span", "br")
   val illegalNodes = xml \\ "_"  filterNot { case node: Node =>
 allowedNodeLabels.contains(node.label)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6fc6cf88/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index bcf0d97..affc201 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -252,6 +252,8 @@ class StreamExecution(
*/
   private def runBatches(): Unit = {
 try {
+  sparkSession.sparkContext.setJobGroup(runId.toString, 
getBatchDescriptionString,
+interruptOnCancel = true)
   if (sparkSession.sessionState.conf.streamingMetricsEnabled) {
 
sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
   }
@@ -289,6 +291,7 @@ class StreamExecution(
   if (currentBatchId < 0) {
 // We'll do this initialization only once
 populateStartOffsets(sparkSessionToRunBatches)
+
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
 logDebug(s"Stream running from $committedOffsets to 
$availableOffsets")
   } else {
 constructNextBatch()
@@ -308,6 +311,7 @@ class StreamExecution(
   logDebug(s"batch ${currentBatchId} committed")
   // We'll increase currentBatchId after we complete processing 
current batch's data
   currentBatchId += 1
+  
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
 } else {
   currentStatus =