This is an automated email from the ASF dual-hosted git repository. lixiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 687dd4e [SPARK-28260][SQL] Add CLOSED state to ExecutionState 687dd4e is described below commit 687dd4eb55739f802692b3c5457618fd6558e538 Author: Yuming Wang <yumw...@ebay.com> AuthorDate: Fri Jul 12 10:31:28 2019 -0700 [SPARK-28260][SQL] Add CLOSED state to ExecutionState ## What changes were proposed in this pull request? The `ThriftServerTab` displays a FINISHED state when the operation finishes execution, but quite often it still takes a lot of time to fetch the results. OperationState has state CLOSED for when after the iterator is closed. This PR add CLOSED state to ExecutionState, and override the `close()` in SparkExecuteStatementOperation, SparkGetColumnsOperation, SparkGetSchemasOperation and SparkGetTablesOperation. ## How was this patch tested? manual tests 1. Add `Thread.sleep(10000)` before [SparkExecuteStatementOperation.scala#L112](https://github.com/apache/spark/blob/b2e7677f4d3d8f47f5f148680af39d38f2b558f0/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala#L112) 2. Switch to `ThriftServerTab`: ![image](https://user-images.githubusercontent.com/5399861/60809590-9dcf2500-a1bd-11e9-826e-33729bb97daf.png) 3. After a while: ![image](https://user-images.githubusercontent.com/5399861/60809719-e850a180-a1bd-11e9-9a6a-546146e626ab.png) Closes #25062 from wangyum/SPARK-28260. Authored-by: Yuming Wang <yumw...@ebay.com> Signed-off-by: gatorsmile <gatorsm...@gmail.com> --- .../spark/sql/hive/thriftserver/HiveThriftServer2.scala | 14 ++++++++++---- .../hive/thriftserver/SparkExecuteStatementOperation.scala | 3 ++- .../sql/hive/thriftserver/SparkGetColumnsOperation.scala | 9 ++++++++- .../sql/hive/thriftserver/SparkGetSchemasOperation.scala | 9 ++++++++- .../sql/hive/thriftserver/SparkGetTablesOperation.scala | 9 ++++++++- .../spark/sql/hive/thriftserver/ui/ThriftServerPage.scala | 8 +++++--- .../sql/hive/thriftserver/ui/ThriftServerSessionPage.scala | 8 +++++--- 7 files changed, 46 insertions(+), 14 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index d1de9f0..b4d1d0d 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -137,7 +137,7 @@ object HiveThriftServer2 extends Logging { } private[thriftserver] object ExecutionState extends Enumeration { - val STARTED, COMPILED, FAILED, FINISHED = Value + val STARTED, COMPILED, FAILED, FINISHED, CLOSED = Value type ExecutionState = Value } @@ -147,16 +147,17 @@ object HiveThriftServer2 extends Logging { val startTimestamp: Long, val userName: String) { var finishTimestamp: Long = 0L + var closeTimestamp: Long = 0L var executePlan: String = "" var detail: String = "" var state: ExecutionState.Value = ExecutionState.STARTED val jobId: ArrayBuffer[String] = ArrayBuffer[String]() var groupId: String = "" - def totalTime: Long = { - if (finishTimestamp == 0L) { + def totalTime(endTime: Long): Long = { + if (endTime == 0L) { System.currentTimeMillis - startTimestamp } else { - finishTimestamp - startTimestamp + endTime - startTimestamp } } } @@ -254,6 +255,11 @@ object HiveThriftServer2 extends Logging { trimExecutionIfNecessary() } + def onOperationClosed(id: String): Unit = synchronized { + executionList(id).closeTimestamp = System.currentTimeMillis + executionList(id).state = ExecutionState.CLOSED + } + private def trimExecutionIfNecessary() = { if (executionList.size > retainedStatements) { val toRemove = math.max(retainedStatements / 10, 1) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 820f76d..2f011c2 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -70,11 +70,12 @@ private[hive] class SparkExecuteStatementOperation( } } - def close(): Unit = { + override def close(): Unit = { // RDDs will be cleaned automatically upon garbage collection. logDebug(s"CLOSING $statementId") cleanup(OperationState.CLOSED) sqlContext.sparkContext.clearJobGroup() + HiveThriftServer2.listener.onOperationClosed(statementId) } def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int) { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala index 99ba968..89faff2 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala @@ -58,8 +58,15 @@ private[hive] class SparkGetColumnsOperation( val catalog: SessionCatalog = sqlContext.sessionState.catalog + private var statementId: String = _ + + override def close(): Unit = { + super.close() + HiveThriftServer2.listener.onOperationClosed(statementId) + } + override def runInternal(): Unit = { - val statementId = UUID.randomUUID().toString + statementId = UUID.randomUUID().toString // Do not change cmdStr. It's used for Hive auditing and authorization. val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName, tablePattern : $tableName" val logMsg = s"Listing columns '$cmdStr, columnName : $columnName'" diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala index 3ecbbd0..87ef154 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala @@ -45,8 +45,15 @@ private[hive] class SparkGetSchemasOperation( schemaName: String) extends GetSchemasOperation(parentSession, catalogName, schemaName) with Logging { + private var statementId: String = _ + + override def close(): Unit = { + super.close() + HiveThriftServer2.listener.onOperationClosed(statementId) + } + override def runInternal(): Unit = { - val statementId = UUID.randomUUID().toString + statementId = UUID.randomUUID().toString // Do not change cmdStr. It's used for Hive auditing and authorization. val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName" val logMsg = s"Listing databases '$cmdStr'" diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala index 8786836..952de42 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala @@ -55,8 +55,15 @@ private[hive] class SparkGetTablesOperation( extends GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes) with Logging{ + private var statementId: String = _ + + override def close(): Unit = { + super.close() + HiveThriftServer2.listener.onOperationClosed(statementId) + } + override def runInternal(): Unit = { - val statementId = UUID.randomUUID().toString + statementId = UUID.randomUUID().toString // Do not change cmdStr. It's used for Hive auditing and authorization. val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName" val tableTypesStr = if (tableTypes == null) "null" else tableTypes.asScala.mkString(",") diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala index 27d2c99..1747b5b 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala @@ -70,8 +70,8 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" private def generateSQLStatsTable(request: HttpServletRequest): Seq[Node] = { val numStatement = listener.getExecutionList.size val table = if (numStatement > 0) { - val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish Time", "Duration", - "Statement", "State", "Detail") + val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish Time", "Close Time", + "Execution Time", "Duration", "Statement", "State", "Detail") val dataRows = listener.getExecutionList.sortBy(_.startTimestamp).reverse def generateDataRow(info: ExecutionInfo): Seq[Node] = { @@ -90,7 +90,9 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" <td>{info.groupId}</td> <td>{formatDate(info.startTimestamp)}</td> <td>{if (info.finishTimestamp > 0) formatDate(info.finishTimestamp)}</td> - <td>{formatDurationOption(Some(info.totalTime))}</td> + <td>{if (info.closeTimestamp > 0) formatDate(info.closeTimestamp)}</td> + <td>{formatDurationOption(Some(info.totalTime(info.finishTimestamp)))}</td> + <td>{formatDurationOption(Some(info.totalTime(info.closeTimestamp)))}</td> <td>{info.statement}</td> <td>{info.state}</td> {errorMessageCell(detail)} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala index fdc9bee..a45c6e3 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala @@ -79,8 +79,8 @@ private[ui] class ThriftServerSessionPage(parent: ThriftServerTab) .filter(_.sessionId == sessionID) val numStatement = executionList.size val table = if (numStatement > 0) { - val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish Time", "Duration", - "Statement", "State", "Detail") + val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish Time", "Close Time", + "Execution Time", "Duration", "Statement", "State", "Detail") val dataRows = executionList.sortBy(_.startTimestamp).reverse def generateDataRow(info: ExecutionInfo): Seq[Node] = { @@ -99,7 +99,9 @@ private[ui] class ThriftServerSessionPage(parent: ThriftServerTab) <td>{info.groupId}</td> <td>{formatDate(info.startTimestamp)}</td> <td>{formatDate(info.finishTimestamp)}</td> - <td>{formatDurationOption(Some(info.totalTime))}</td> + <td>{formatDate(info.closeTimestamp)}</td> + <td>{formatDurationOption(Some(info.totalTime(info.finishTimestamp)))}</td> + <td>{formatDurationOption(Some(info.totalTime(info.closeTimestamp)))}</td> <td>{info.statement}</td> <td>{info.state}</td> {errorMessageCell(detail)} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org