This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7173786 [SPARK-29543][SS][UI] Structured Streaming Web UI 7173786 is described below commit 71737861531180bbda9aec8d241b1428fe91cab2 Author: uncleGen <husty...@gmail.com> AuthorDate: Wed Jan 29 13:43:51 2020 -0800 [SPARK-29543][SS][UI] Structured Streaming Web UI ### What changes were proposed in this pull request? This PR adds two pages to Web UI for Structured Streaming: - "/streamingquery": Streaming Query Page, providing some aggregate information for running/completed streaming queries. - "/streamingquery/statistics": Streaming Query Statistics Page, providing detailed information for streaming query, including `Input Rate`, `Process Rate`, `Input Rows`, `Batch Duration` and `Operation Duration` ![Screen Shot 2020-01-29 at 1 38 00 PM](https://user-images.githubusercontent.com/1000778/73399837-cd01cc80-429c-11ea-9d4b-1d200a41b8d5.png) ![Screen Shot 2020-01-29 at 1 39 16 PM](https://user-images.githubusercontent.com/1000778/73399838-cd01cc80-429c-11ea-8185-4e56db6866bd.png) ### Why are the changes needed? It helps users to better monitor Structured Streaming query. ### Does this PR introduce any user-facing change? No ### How was this patch tested? - new added and existing UTs - manual test Closes #26201 from uncleGen/SPARK-29543. Lead-authored-by: uncleGen <husty...@gmail.com> Co-authored-by: Yuanjian Li <xyliyuanj...@gmail.com> Co-authored-by: Genmao Yu <husty...@gmail.com> Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> --- .../org/apache/spark}/ui/static/streaming-page.css | 0 .../org/apache/spark}/ui/static/streaming-page.js | 0 .../spark/ui/static/structured-streaming-page.js | 171 +++++++++++++ .../resources/org/apache/spark/ui/static/webui.js | 2 + .../scala/org/apache/spark/ui/GraphUIData.scala | 169 +++++++++++++ .../main/scala/org/apache/spark/ui/UIUtils.scala | 91 +++++++ .../scala/org/apache/spark/ui/jobs/StagePage.scala | 14 +- .../org/apache/spark/ui/jobs/StageTable.scala | 14 +- project/MimaExcludes.scala | 5 +- .../org/apache/spark/sql/internal/SQLConf.scala | 16 ++ .../sql/execution/streaming/ProgressReporter.scala | 5 +- .../sql/execution/streaming/StreamExecution.scala | 3 +- .../apache/spark/sql/internal/SharedState.scala | 19 +- .../sql/streaming/StreamingQueryListener.scala | 4 +- .../sql/streaming/StreamingQueryManager.scala | 6 +- .../org/apache/spark/sql/streaming/progress.scala | 2 + .../sql/streaming/ui/StreamingQueryPage.scala | 147 +++++++++++ .../ui/StreamingQueryStatisticsPage.scala | 271 +++++++++++++++++++++ .../ui/StreamingQueryStatusListener.scala | 122 ++++++++++ .../spark/sql/streaming/ui/StreamingQueryTab.scala | 33 +-- .../apache/spark/sql/streaming/ui/UIUtils.scala | 60 +++++ .../streaming/StreamingQueryListenerSuite.scala | 10 +- .../StreamingQueryStatusAndProgressSuite.scala | 2 + .../spark/sql/streaming/StreamingQuerySuite.scala | 14 +- .../sql/streaming/ui/StreamingQueryPageSuite.scala | 125 ++++++++++ .../ui/StreamingQueryStatusListenerSuite.scala | 101 ++++++++ .../spark/sql/streaming/ui/UIUtilsSuite.scala | 41 ++++ .../hive/thriftserver/ui/ThriftServerPage.scala | 16 +- .../apache/spark/streaming/dstream/DStream.scala | 4 +- .../spark/streaming/scheduler/JobScheduler.scala | 4 +- .../spark/streaming/ui/AllBatchesTable.scala | 2 +- .../org/apache/spark/streaming/ui/BatchPage.scala | 2 +- .../apache/spark/streaming/ui/StreamingPage.scala | 125 +--------- .../apache/spark/streaming/ui/StreamingTab.scala | 2 +- .../org/apache/spark/streaming/ui/UIUtils.scala | 71 +----- .../apache/spark/streaming/DStreamScopeSuite.scala | 6 +- .../apache/spark/streaming/ui/UIUtilsSuite.scala | 12 +- 37 files changed, 1408 insertions(+), 283 deletions(-) diff --git a/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.css b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.css similarity index 100% rename from streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.css rename to core/src/main/resources/org/apache/spark/ui/static/streaming-page.css diff --git a/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js similarity index 100% rename from streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js rename to core/src/main/resources/org/apache/spark/ui/static/streaming-page.js diff --git a/core/src/main/resources/org/apache/spark/ui/static/structured-streaming-page.js b/core/src/main/resources/org/apache/spark/ui/static/structured-streaming-page.js new file mode 100644 index 0000000..70250fd --- /dev/null +++ b/core/src/main/resources/org/apache/spark/ui/static/structured-streaming-page.js @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// pre-define some colors for legends. +var colorPool = ["#F8C471", "#F39C12", "#B9770E", "#73C6B6", "#16A085", "#117A65", "#B2BABB", "#7F8C8D", "#616A6B"]; + +function drawAreaStack(id, labels, values, minX, maxX, minY, maxY) { + d3.select(d3.select(id).node().parentNode) + .style("padding", "8px 0 8px 8px") + .style("border-right", "0px solid white"); + + // Setup svg using Bostock's margin convention + var margin = {top: 20, right: 40, bottom: 30, left: maxMarginLeftForTimeline}; + var width = 850 - margin.left - margin.right; + var height = 300 - margin.top - margin.bottom; + + var svg = d3.select(id) + .append("svg") + .attr("width", width + margin.left + margin.right) + .attr("height", height + margin.top + margin.bottom) + .append("g") + .attr("transform", "translate(" + margin.left + "," + margin.top + ")"); + + var data = values; + + var parse = d3.time.format("%H:%M:%S.%L").parse; + + // Transpose the data into layers + var dataset = d3.layout.stack()(labels.map(function(fruit) { + return data.map(function(d) { + return {_x: d.x, x: parse(d.x), y: +d[fruit]}; + }); + })); + + + // Set x, y and colors + var x = d3.scale.ordinal() + .domain(dataset[0].map(function(d) { return d.x; })) + .rangeRoundBands([10, width-10], 0.02); + + var y = d3.scale.linear() + .domain([0, d3.max(dataset, function(d) { return d3.max(d, function(d) { return d.y0 + d.y; }); })]) + .range([height, 0]); + + var colors = colorPool.slice(0, labels.length) + + // Define and draw axes + var yAxis = d3.svg.axis() + .scale(y) + .orient("left") + .ticks(7) + .tickFormat( function(d) { return d } ); + + var xAxis = d3.svg.axis() + .scale(x) + .orient("bottom") + .tickFormat(d3.time.format("%H:%M:%S.%L")); + + // Only show the first and last time in the graph + var xline = [] + xline.push(x.domain()[0]) + xline.push(x.domain()[x.domain().length - 1]) + xAxis.tickValues(xline); + + svg.append("g") + .attr("class", "y axis") + .call(yAxis) + .append("text") + .attr("transform", "translate(0," + unitLabelYOffset + ")") + .text("ms"); + + svg.append("g") + .attr("class", "x axis") + .attr("transform", "translate(0," + height + ")") + .call(xAxis); + + // Create groups for each series, rects for each segment + var groups = svg.selectAll("g.cost") + .data(dataset) + .enter().append("g") + .attr("class", "cost") + .style("fill", function(d, i) { return colors[i]; }); + + var rect = groups.selectAll("rect") + .data(function(d) { return d; }) + .enter() + .append("rect") + .attr("x", function(d) { return x(d.x); }) + .attr("y", function(d) { return y(d.y0 + d.y); }) + .attr("height", function(d) { return y(d.y0) - y(d.y0 + d.y); }) + .attr("width", x.rangeBand()) + .on('mouseover', function(d) { + var tip = ''; + var idx = 0; + var _values = timeToValues[d._x] + _values.forEach(function (k) { + tip += labels[idx] + ': ' + k + ' '; + idx += 1; + }); + tip += " at " + d._x + showBootstrapTooltip(d3.select(this).node(), tip); + }) + .on('mouseout', function() { + hideBootstrapTooltip(d3.select(this).node()); + }) + .on("mousemove", function(d) { + var xPosition = d3.mouse(this)[0] - 15; + var yPosition = d3.mouse(this)[1] - 25; + tooltip.attr("transform", "translate(" + xPosition + "," + yPosition + ")"); + tooltip.select("text").text(d.y); + }); + + + // Draw legend + var legend = svg.selectAll(".legend") + .data(colors) + .enter().append("g") + .attr("class", "legend") + .attr("transform", function(d, i) { return "translate(30," + i * 19 + ")"; }); + + legend.append("rect") + .attr("x", width - 20) + .attr("width", 18) + .attr("height", 18) + .style("fill", function(d, i) {return colors.slice().reverse()[i];}) + .on('mouseover', function(d, i) { + var len = labels.length + showBootstrapTooltip(d3.select(this).node(), labels[len - 1 - i]); + }) + .on('mouseout', function() { + hideBootstrapTooltip(d3.select(this).node()); + }) + .on("mousemove", function(d) { + var xPosition = d3.mouse(this)[0] - 15; + var yPosition = d3.mouse(this)[1] - 25; + tooltip.attr("transform", "translate(" + xPosition + "," + yPosition + ")"); + tooltip.select("text").text(d.y); + }); + + // Prep the tooltip bits, initial display is hidden + var tooltip = svg.append("g") + .attr("class", "tooltip") + .style("display", "none"); + + tooltip.append("rect") + .attr("width", 30) + .attr("height", 20) + .attr("fill", "white") + .style("opacity", 0.5); + + tooltip.append("text") + .attr("x", 15) + .attr("dy", "1.2em") + .style("text-anchor", "middle") + .attr("font-size", "12px") + .attr("font-weight", "bold"); +} diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.js b/core/src/main/resources/org/apache/spark/ui/static/webui.js index fac464e..0ba461f 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/webui.js +++ b/core/src/main/resources/org/apache/spark/ui/static/webui.js @@ -90,6 +90,8 @@ $(function() { collapseTablePageLoad('collapse-aggregated-sessionstat','aggregated-sessionstat'); collapseTablePageLoad('collapse-aggregated-sqlstat','aggregated-sqlstat'); collapseTablePageLoad('collapse-aggregated-sqlsessionstat','aggregated-sqlsessionstat'); + collapseTablePageLoad('collapse-aggregated-activeQueries','aggregated-activeQueries'); + collapseTablePageLoad('collapse-aggregated-completedQueries','aggregated-completedQueries'); }); $(function() { diff --git a/core/src/main/scala/org/apache/spark/ui/GraphUIData.scala b/core/src/main/scala/org/apache/spark/ui/GraphUIData.scala new file mode 100644 index 0000000..87ff677 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/GraphUIData.scala @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui + +import java.{util => ju} +import java.lang.{Long => JLong} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.xml.{Node, Unparsed} + +/** + * A helper class to generate JavaScript and HTML for both timeline and histogram graphs. + * + * @param timelineDivId the timeline `id` used in the html `div` tag + * @param histogramDivId the timeline `id` used in the html `div` tag + * @param data the data for the graph + * @param minX the min value of X axis + * @param maxX the max value of X axis + * @param minY the min value of Y axis + * @param maxY the max value of Y axis + * @param unitY the unit of Y axis + * @param batchInterval if `batchInterval` is not None, we will draw a line for `batchInterval` in + * the graph + */ +private[spark] class GraphUIData( + timelineDivId: String, + histogramDivId: String, + data: Seq[(Long, Double)], + minX: Long, + maxX: Long, + minY: Double, + maxY: Double, + unitY: String, + batchInterval: Option[Double] = None) { + + private var dataJavaScriptName: String = _ + + def generateDataJs(jsCollector: JsCollector): Unit = { + val jsForData = data.map { case (x, y) => + s"""{"x": $x, "y": $y}""" + }.mkString("[", ",", "]") + dataJavaScriptName = jsCollector.nextVariableName + jsCollector.addPreparedStatement(s"var $dataJavaScriptName = $jsForData;") + } + + def generateTimelineHtml(jsCollector: JsCollector): Seq[Node] = { + jsCollector.addPreparedStatement(s"registerTimeline($minY, $maxY);") + if (batchInterval.isDefined) { + jsCollector.addStatement( + "drawTimeline(" + + s"'#$timelineDivId', $dataJavaScriptName, $minX, $maxX, $minY, $maxY, '$unitY'," + + s" ${batchInterval.get}" + + ");") + } else { + jsCollector.addStatement( + s"drawTimeline('#$timelineDivId', $dataJavaScriptName, $minX, $maxX, $minY, $maxY," + + s" '$unitY');") + } + <div id={timelineDivId}></div> + } + + def generateHistogramHtml(jsCollector: JsCollector): Seq[Node] = { + val histogramData = s"$dataJavaScriptName.map(function(d) { return d.y; })" + jsCollector.addPreparedStatement(s"registerHistogram($histogramData, $minY, $maxY);") + if (batchInterval.isDefined) { + jsCollector.addStatement( + "drawHistogram(" + + s"'#$histogramDivId', $histogramData, $minY, $maxY, '$unitY', ${batchInterval.get}" + + ");") + } else { + jsCollector.addStatement( + s"drawHistogram('#$histogramDivId', $histogramData, $minY, $maxY, '$unitY');") + } + <div id={histogramDivId}></div> + } + + def generateAreaStackHtmlWithData( + jsCollector: JsCollector, + values: Array[(Long, ju.Map[String, JLong])]): Seq[Node] = { + val operationLabels = values.flatMap(_._2.keySet().asScala).toSet + val durationDataPadding = UIUtils.durationDataPadding(values) + val jsForData = durationDataPadding.map { case (x, y) => + val s = y.toSeq.sortBy(_._1).map(e => s""""${e._1}": "${e._2}"""").mkString(",") + s"""{x: "${UIUtils.formatBatchTime(x, 1, showYYYYMMSS = false)}", $s}""" + }.mkString("[", ",", "]") + val jsForLabels = operationLabels.toSeq.sorted.mkString("[\"", "\",\"", "\"]") + + val (maxX, minX, maxY, minY) = if (values != null && values.length > 0) { + val xValues = values.map(_._1.toLong) + val yValues = values.map(_._2.asScala.toSeq.map(_._2.toLong).sum) + (xValues.max, xValues.min, yValues.max, yValues.min) + } else { + (0L, 0L, 0L, 0L) + } + + dataJavaScriptName = jsCollector.nextVariableName + jsCollector.addPreparedStatement(s"var $dataJavaScriptName = $jsForData;") + val labels = jsCollector.nextVariableName + jsCollector.addPreparedStatement(s"var $labels = $jsForLabels;") + jsCollector.addStatement( + s"drawAreaStack('#$timelineDivId', $labels, $dataJavaScriptName, $minX, $maxX, $minY, $maxY)") + <div id={timelineDivId}></div> + } +} + +/** + * A helper class that allows the user to add JavaScript statements which will be executed when the + * DOM has finished loading. + */ +private[spark] class JsCollector { + + private var variableId = 0 + + /** + * Return the next unused JavaScript variable name + */ + def nextVariableName: String = { + variableId += 1 + "v" + variableId + } + + /** + * JavaScript statements that will execute before `statements` + */ + private val preparedStatements = ArrayBuffer[String]() + + /** + * JavaScript statements that will execute after `preparedStatements` + */ + private val statements = ArrayBuffer[String]() + + def addPreparedStatement(js: String): Unit = { + preparedStatements += js + } + + def addStatement(js: String): Unit = { + statements += js + } + + /** + * Generate a html snippet that will execute all scripts when the DOM has finished loading. + */ + def toHtml: Seq[Node] = { + val js = + s""" + |$$(document).ready(function() { + | ${preparedStatements.mkString("\n")} + | ${statements.mkString("\n")} + |});""".stripMargin + + <script>{Unparsed(js)}</script> + } +} diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 143303d..94c4521 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -17,6 +17,8 @@ package org.apache.spark.ui +import java.{util => ju} +import java.lang.{Long => JLong} import java.net.URLDecoder import java.nio.charset.StandardCharsets.UTF_8 import java.text.SimpleDateFormat @@ -24,6 +26,7 @@ import java.util.{Date, Locale, TimeZone} import javax.servlet.http.HttpServletRequest import javax.ws.rs.core.{MediaType, Response} +import scala.collection.JavaConverters._ import scala.util.control.NonFatal import scala.xml._ import scala.xml.transform.{RewriteRule, RuleTransformer} @@ -119,6 +122,59 @@ private[spark] object UIUtils extends Logging { } } + // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use. + private val batchTimeFormat = new ThreadLocal[SimpleDateFormat]() { + override def initialValue(): SimpleDateFormat = + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss", Locale.US) + } + + private val batchTimeFormatWithMilliseconds = new ThreadLocal[SimpleDateFormat]() { + override def initialValue(): SimpleDateFormat = + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS", Locale.US) + } + + /** + * If `batchInterval` is less than 1 second, format `batchTime` with milliseconds. Otherwise, + * format `batchTime` without milliseconds. + * + * @param batchTime the batch time to be formatted + * @param batchInterval the batch interval + * @param showYYYYMMSS if showing the `yyyy/MM/dd` part. If it's false, the return value wll be + * only `HH:mm:ss` or `HH:mm:ss.SSS` depending on `batchInterval` + * @param timezone only for test + */ + def formatBatchTime( + batchTime: Long, + batchInterval: Long, + showYYYYMMSS: Boolean = true, + timezone: TimeZone = null): String = { + val oldTimezones = + (batchTimeFormat.get.getTimeZone, batchTimeFormatWithMilliseconds.get.getTimeZone) + if (timezone != null) { + batchTimeFormat.get.setTimeZone(timezone) + batchTimeFormatWithMilliseconds.get.setTimeZone(timezone) + } + try { + val formattedBatchTime = + if (batchInterval < 1000) { + batchTimeFormatWithMilliseconds.get.format(batchTime) + } else { + // If batchInterval >= 1 second, don't show milliseconds + batchTimeFormat.get.format(batchTime) + } + if (showYYYYMMSS) { + formattedBatchTime + } else { + formattedBatchTime.substring(formattedBatchTime.indexOf(' ') + 1) + } + } finally { + if (timezone != null) { + batchTimeFormat.get.setTimeZone(oldTimezones._1) + batchTimeFormatWithMilliseconds.get.setTimeZone(oldTimezones._2) + } + } + } + /** Generate a human-readable string representing a number (e.g. 100 K) */ def formatNumber(records: Double): String = { val trillion = 1e12 @@ -572,4 +628,39 @@ private[spark] object UIUtils extends Logging { def buildErrorResponse(status: Response.Status, msg: String): Response = { Response.status(status).entity(msg).`type`(MediaType.TEXT_PLAIN).build() } + + /** + * There may be different duration labels in each batch. So we need to + * mark those missing duration label as '0d' to avoid UI rending error. + */ + def durationDataPadding( + values: Array[(Long, ju.Map[String, JLong])]): Array[(Long, Map[String, Double])] = { + val operationLabels = values.flatMap(_._2.keySet().asScala).toSet + values.map { case (xValue, yValue) => + val dataPadding = operationLabels.map { opLabel => + if (yValue.containsKey(opLabel)) { + (opLabel, yValue.get(opLabel).toDouble) + } else { + (opLabel, 0d) + } + } + (xValue, dataPadding.toMap) + } + } + + def detailsUINode(isMultiline: Boolean, message: String): Seq[Node] = { + if (isMultiline) { + // scalastyle:off + <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')" + class="expand-details"> + +details + </span> ++ + <div class="stacktrace-details collapsed"> + <pre>{message}</pre> + </div> + // scalastyle:on + } else { + Seq.empty[Node] + } + } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 4dc5349..ccaa70b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -721,19 +721,7 @@ private[ui] class TaskPagedTable( } else { error }) - val details = if (isMultiline) { - // scalastyle:off - <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')" - class="expand-details"> - +details - </span> ++ - <div class="stacktrace-details collapsed"> - <pre>{error}</pre> - </div> - // scalastyle:on - } else { - "" - } + val details = UIUtils.detailsUINode(isMultiline, error) <td>{errorSummary}{details}</td> } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index ac431c9..a7d38e9 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -309,19 +309,7 @@ private[ui] class StagePagedTable( } else { failureReason }) - val details = if (isMultiline) { - // scalastyle:off - <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')" - class="expand-details"> - +details - </span> ++ - <div class="stacktrace-details collapsed"> - <pre>{failureReason}</pre> - </div> - // scalastyle:on - } else { - "" - } + val details = UIUtils.detailsUINode(isMultiline, failureReason) <td valign="middle">{failureReasonSummary}{details}</td> } diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 68e9313..65ffa22 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -489,7 +489,10 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.AFTSurvivalRegressionModel.setPredictionCol"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.AFTSurvivalRegression.setFeaturesCol"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.AFTSurvivalRegression.setLabelCol"), - ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.AFTSurvivalRegression.setPredictionCol") + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.AFTSurvivalRegression.setPredictionCol"), + + // [SPARK-29543][SS][UI] Init structured streaming ui + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryStartedEvent.this") ) // Exclude rules for 2.4.x diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 0e0a814..e13d65b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1150,6 +1150,18 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STREAMING_UI_ENABLED = + buildConf("spark.sql.streaming.ui.enabled") + .doc("Whether to run the structured streaming UI for the Spark application.") + .booleanConf + .createWithDefault(true) + + val STREAMING_UI_INACTIVE_QUERY_RETENTION = + buildConf("spark.sql.streaming.ui.numInactiveQueries") + .doc("The number of inactive queries to retain for structured streaming ui.") + .intConf + .createWithDefault(100) + val VARIABLE_SUBSTITUTE_ENABLED = buildConf("spark.sql.variable.substitute") .doc("This enables substitution using syntax like ${var} ${system:var} and ${env:var}.") @@ -2262,6 +2274,10 @@ class SQLConf extends Serializable with Logging { def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED) + def isStreamingUIEnabled: Boolean = getConf(STREAMING_UI_ENABLED) + + def streamingUIInactiveQueryRetention: Int = getConf(STREAMING_UI_INACTIVE_QUERY_RETENTION) + def streamingFileCommitProtocolClass: String = getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS) def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 71bcd53..f20291e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -148,8 +148,8 @@ trait ProgressReporter extends Logging { currentTriggerEndTimestamp = triggerClock.getTimeMillis() val executionStats = extractExecutionStats(hasNewData) - val processingTimeSec = Math.max(1L, - currentTriggerEndTimestamp - currentTriggerStartTimestamp).toDouble / MILLIS_PER_SECOND + val processingTimeMills = currentTriggerEndTimestamp - currentTriggerStartTimestamp + val processingTimeSec = Math.max(1L, processingTimeMills).toDouble / MILLIS_PER_SECOND val inputTimeSec = if (lastTriggerStartTimestamp >= 0) { (currentTriggerStartTimestamp - lastTriggerStartTimestamp).toDouble / MILLIS_PER_SECOND @@ -181,6 +181,7 @@ trait ProgressReporter extends Logging { name = name, timestamp = formatTimestamp(currentTriggerStartTimestamp), batchId = currentBatchId, + batchDuration = processingTimeMills, durationMs = new java.util.HashMap(currentDurationsMs.toMap.mapValues(long2Long).asJava), eventTime = new java.util.HashMap(executionStats.eventTimeStats.asJava), stateOperators = executionStats.stateOperators.toArray, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 6dff5c6..ed908a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -307,7 +307,8 @@ abstract class StreamExecution( } // `postEvent` does not throw non fatal exception. - postEvent(new QueryStartedEvent(id, runId, name)) + val submissionTime = triggerClock.getTimeMillis() + postEvent(new QueryStartedEvent(id, runId, name, submissionTime)) // Unblock starting thread startLatch.countDown() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index de3805e..fefd72d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.internal import java.net.URL -import java.util.{Locale, UUID} +import java.util.UUID import java.util.concurrent.ConcurrentHashMap import javax.annotation.concurrent.GuardedBy @@ -36,6 +36,8 @@ import org.apache.spark.sql.execution.CacheManager import org.apache.spark.sql.execution.streaming.StreamExecution import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab} import org.apache.spark.sql.internal.StaticSQLConf._ +import org.apache.spark.sql.streaming.StreamingQueryListener +import org.apache.spark.sql.streaming.ui.{StreamingQueryStatusListener, StreamingQueryTab} import org.apache.spark.status.ElementTrackingStore import org.apache.spark.util.Utils @@ -139,6 +141,21 @@ private[sql] class SharedState( } /** + * A [[StreamingQueryListener]] for structured streaming ui, it contains all streaming query ui + * data to show. + */ + lazy val streamingQueryStatusListener: Option[StreamingQueryStatusListener] = { + val sqlConf = SQLConf.get + if (sqlConf.isStreamingUIEnabled) { + val statusListener = new StreamingQueryStatusListener(sqlConf) + sparkContext.ui.foreach(new StreamingQueryTab(statusListener, _)) + Some(statusListener) + } else { + None + } + } + + /** * A catalog that interacts with external systems. */ lazy val externalCatalog: ExternalCatalogWithListener = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala index cc81cf6..dd842cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala @@ -82,13 +82,15 @@ object StreamingQueryListener { * @param id A unique query id that persists across restarts. See `StreamingQuery.id()`. * @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`. * @param name User-specified name of the query, null if not specified. + * @param submissionTime The timestamp to start a query. * @since 2.1.0 */ @Evolving class QueryStartedEvent private[sql]( val id: UUID, val runId: UUID, - val name: String) extends Event + val name: String, + val submissionTime: Long) extends Event /** * Event representing any progress updates in a query. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 810f4a1..4d0d8ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.Evolving import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table} @@ -37,7 +38,7 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.STREAMING_QUERY_LISTENERS -import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} +import org.apache.spark.util.{Clock, SystemClock, Utils} /** * A class to manage all the [[StreamingQuery]] active in a `SparkSession`. @@ -68,6 +69,9 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo logInfo(s"Registered listener ${listener.getClass.getName}") }) } + sparkSession.sharedState.streamingQueryStatusListener.foreach { listener => + addListener(listener) + } } catch { case e: Exception => throw new SparkException("Exception when registering StreamingQueryListener", e) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index a9681db..13b506b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -85,6 +85,7 @@ class StateOperatorProgress private[sql]( * case of retries after a failure a given batchId my be executed more than once. * Similarly, when there is no data to be processed, the batchId will not be * incremented. + * @param batchDuration The process duration of each batch. * @param durationMs The amount of time taken to perform various operations in milliseconds. * @param eventTime Statistics of event time seen in this batch. It may contain the following keys: * {{{ @@ -105,6 +106,7 @@ class StreamingQueryProgress private[sql]( val name: String, val timestamp: String, val batchId: Long, + val batchDuration: Long, val durationMs: ju.Map[String, JLong], val eventTime: ju.Map[String, String], val stateOperators: Array[StateOperatorProgress], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala new file mode 100644 index 0000000..650f64f --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.ui + +import java.text.SimpleDateFormat +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import org.apache.commons.lang3.StringEscapeUtils + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone +import org.apache.spark.sql.streaming.ui.UIUtils._ +import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage} + +private[ui] class StreamingQueryPage(parent: StreamingQueryTab) + extends WebUIPage("") with Logging { + val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + df.setTimeZone(getTimeZone("UTC")) + + override def render(request: HttpServletRequest): Seq[Node] = { + val content = generateStreamingQueryTable(request) + SparkUIUtils.headerSparkPage(request, "Streaming Query", content, parent) + } + + def generateDataRow(request: HttpServletRequest, queryActive: Boolean) + (query: StreamingQueryUIData): Seq[Node] = { + + def details(detail: Any): Seq[Node] = { + if (queryActive) { + return Seq.empty[Node] + } + val detailString = detail.asInstanceOf[String] + val isMultiline = detailString.indexOf('\n') >= 0 + val summary = StringEscapeUtils.escapeHtml4( + if (isMultiline) detailString.substring(0, detailString.indexOf('\n')) else detailString + ) + val details = SparkUIUtils.detailsUINode(isMultiline, detailString) + <td>{summary}{details}</td> + } + + val statisticsLink = "%s/%s/statistics?id=%s" + .format(SparkUIUtils.prependBaseUri(request, parent.basePath), parent.prefix, query.runId) + + val name = UIUtils.getQueryName(query) + val status = UIUtils.getQueryStatus(query) + val duration = if (queryActive) { + SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.submissionTime) + } else { + withNoProgress(query, { + val endTimeMs = query.lastProgress.timestamp + SparkUIUtils.formatDurationVerbose(df.parse(endTimeMs).getTime - query.submissionTime) + }, "-") + } + + <tr> + <td> {name} </td> + <td> {status} </td> + <td> {query.id} </td> + <td> <a href={statisticsLink}> {query.runId} </a> </td> + <td> {SparkUIUtils.formatDate(query.submissionTime)} </td> + <td> {duration} </td> + <td> {withNoProgress(query, { + (query.recentProgress.map(p => withNumberInvalid(p.inputRowsPerSecond)).sum / + query.recentProgress.length).formatted("%.2f") }, "NaN")} + </td> + <td> {withNoProgress(query, { + (query.recentProgress.map(p => withNumberInvalid(p.processedRowsPerSecond)).sum / + query.recentProgress.length).formatted("%.2f") }, "NaN")} + </td> + <td> {withNoProgress(query, { query.lastProgress.batchId }, "NaN")} </td> + {details(query.exception.getOrElse("-"))} + </tr> + } + + private def generateStreamingQueryTable(request: HttpServletRequest): Seq[Node] = { + val (activeQueries, inactiveQueries) = parent.statusListener.allQueryStatus + .partition(_.isActive) + val activeQueryTables = if (activeQueries.nonEmpty) { + val headerRow = Seq( + "Name", "Status", "Id", "Run ID", "Submitted Time", "Duration", "Avg Input /sec", + "Avg Process /sec", "Lastest Batch") + + Some(SparkUIUtils.listingTable(headerRow, generateDataRow(request, queryActive = true), + activeQueries, true, None, Seq(null), false)) + } else { + None + } + + val inactiveQueryTables = if (inactiveQueries.nonEmpty) { + val headerRow = Seq( + "Name", "Status", "Id", "Run ID", "Submitted Time", "Duration", "Avg Input /sec", + "Avg Process /sec", "Lastest Batch", "Error") + + Some(SparkUIUtils.listingTable(headerRow, generateDataRow(request, queryActive = false), + inactiveQueries, true, None, Seq(null), false)) + } else { + None + } + + // scalastyle:off + val content = + <span id="completed" class="collapse-aggregated-activeQueries collapse-table" + onClick="collapseTable('collapse-aggregated-activeQueries','aggregated-activeQueries')"> + <h5 id="activequeries"> + <span class="collapse-table-arrow arrow-open"></span> + <a>Active Streaming Queries ({activeQueries.length})</a> + </h5> + </span> ++ + <div> + <ul class="aggregated-activeQueries collapsible-table"> + {activeQueryTables.getOrElse(Seq.empty[Node])} + </ul> + </div> ++ + <span id="completed" class="collapse-aggregated-completedQueries collapse-table" + onClick="collapseTable('collapse-aggregated-completedQueries','aggregated-completedQueries')"> + <h5 id="completedqueries"> + <span class="collapse-table-arrow arrow-open"></span> + <a>Completed Streaming Queries ({inactiveQueries.length})</a> + </h5> + </span> ++ + <div> + <ul class="aggregated-completedQueries collapsible-table"> + {inactiveQueryTables.getOrElse(Seq.empty[Node])} + </ul> + </div> + // scalastyle:on + + content + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala new file mode 100644 index 0000000..56672ce --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.ui + +import java.{util => ju} +import java.lang.{Long => JLong} +import java.text.SimpleDateFormat +import java.util.UUID +import javax.servlet.http.HttpServletRequest + +import scala.xml.{Node, Unparsed} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone +import org.apache.spark.sql.streaming.ui.UIUtils._ +import org.apache.spark.ui.{GraphUIData, JsCollector, UIUtils => SparkUIUtils, WebUIPage} + +private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab) + extends WebUIPage("statistics") with Logging { + val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + df.setTimeZone(getTimeZone("UTC")) + + def generateLoadResources(request: HttpServletRequest): Seq[Node] = { + // scalastyle:off + <script src={SparkUIUtils.prependBaseUri(request, "/static/d3.min.js")}></script> + <link rel="stylesheet" href={SparkUIUtils.prependBaseUri(request, "/static/streaming-page.css")} type="text/css"/> + <script src={SparkUIUtils.prependBaseUri(request, "/static/streaming-page.js")}></script> + <script src={SparkUIUtils.prependBaseUri(request, "/static/structured-streaming-page.js")}></script> + // scalastyle:on + } + + override def render(request: HttpServletRequest): Seq[Node] = { + val parameterId = request.getParameter("id") + require(parameterId != null && parameterId.nonEmpty, "Missing id parameter") + + val query = parent.statusListener.allQueryStatus.find { case q => + q.runId.equals(UUID.fromString(parameterId)) + }.getOrElse(throw new IllegalArgumentException(s"Failed to find streaming query $parameterId")) + + val resources = generateLoadResources(request) + val basicInfo = generateBasicInfo(query) + val content = + resources ++ + basicInfo ++ + generateStatTable(query) + SparkUIUtils.headerSparkPage(request, "Streaming Query Statistics", content, parent) + } + + def generateTimeMap(times: Seq[Long]): Seq[Node] = { + val js = "var timeFormat = {};\n" + times.map { time => + val formattedTime = SparkUIUtils.formatBatchTime(time, 1, showYYYYMMSS = false) + s"timeFormat[$time] = '$formattedTime';" + }.mkString("\n") + + <script>{Unparsed(js)}</script> + } + + def generateVar(values: Array[(Long, ju.Map[String, JLong])]): Seq[Node] = { + val durationDataPadding = SparkUIUtils.durationDataPadding(values) + val js = "var timeToValues = {};\n" + durationDataPadding.map { case (x, y) => + val s = y.toSeq.sortBy(_._1).map(e => s""""${e._2}"""").mkString("[", ",", "]") + s"""timeToValues["${SparkUIUtils.formatBatchTime(x, 1, showYYYYMMSS = false)}"] = $s;""" + }.mkString("\n") + + <script>{Unparsed(js)}</script> + } + + def generateBasicInfo(query: StreamingQueryUIData): Seq[Node] = { + val duration = if (query.isActive) { + SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.submissionTime) + } else { + withNoProgress(query, { + val end = query.lastProgress.timestamp + val start = query.recentProgress.head.timestamp + SparkUIUtils.formatDurationVerbose( + df.parse(end).getTime - df.parse(start).getTime) + }, "-") + } + + val name = UIUtils.getQueryName(query) + val numBatches = withNoProgress(query, { query.lastProgress.batchId + 1L }, 0) + <div>Running batches for + <strong> + {duration} + </strong> + since + <strong> + {SparkUIUtils.formatDate(query.submissionTime)} + </strong> + (<strong>{numBatches}</strong> completed batches) + </div> + <br /> + <div><strong>Name: </strong>{name}</div> + <div><strong>Id: </strong>{query.id}</div> + <div><strong>RunId: </strong>{query.runId}</div> + <br /> + } + + def generateStatTable(query: StreamingQueryUIData): Seq[Node] = { + val batchTimes = withNoProgress(query, + query.recentProgress.map(p => df.parse(p.timestamp).getTime), Array.empty[Long]) + val minBatchTime = + withNoProgress(query, df.parse(query.recentProgress.head.timestamp).getTime, 0L) + val maxBatchTime = + withNoProgress(query, df.parse(query.lastProgress.timestamp).getTime, 0L) + val maxRecordRate = + withNoProgress(query, query.recentProgress.map(_.inputRowsPerSecond).max, 0L) + val minRecordRate = 0L + val maxProcessRate = + withNoProgress(query, query.recentProgress.map(_.processedRowsPerSecond).max, 0L) + + val minProcessRate = 0L + val maxRows = withNoProgress(query, query.recentProgress.map(_.numInputRows).max, 0L) + val minRows = 0L + val maxBatchDuration = withNoProgress(query, query.recentProgress.map(_.batchDuration).max, 0L) + val minBatchDuration = 0L + + val inputRateData = withNoProgress(query, + query.recentProgress.map(p => (df.parse(p.timestamp).getTime, + withNumberInvalid { p.inputRowsPerSecond })), Array.empty[(Long, Double)]) + val processRateData = withNoProgress(query, + query.recentProgress.map(p => (df.parse(p.timestamp).getTime, + withNumberInvalid { p.processedRowsPerSecond })), Array.empty[(Long, Double)]) + val inputRowsData = withNoProgress(query, + query.recentProgress.map(p => (df.parse(p.timestamp).getTime, + withNumberInvalid { p.numInputRows })), Array.empty[(Long, Double)]) + val batchDurations = withNoProgress(query, + query.recentProgress.map(p => (df.parse(p.timestamp).getTime, + withNumberInvalid { p.batchDuration })), Array.empty[(Long, Double)]) + val operationDurationData = withNoProgress(query, query.recentProgress.map { p => + val durationMs = p.durationMs + // remove "triggerExecution" as it count the other operation duration. + durationMs.remove("triggerExecution") + (df.parse(p.timestamp).getTime, durationMs)}, Array.empty[(Long, ju.Map[String, JLong])]) + + val jsCollector = new JsCollector + val graphUIDataForInputRate = + new GraphUIData( + "input-rate-timeline", + "input-rate-histogram", + inputRateData, + minBatchTime, + maxBatchTime, + minRecordRate, + maxRecordRate, + "records/sec") + graphUIDataForInputRate.generateDataJs(jsCollector) + + val graphUIDataForProcessRate = + new GraphUIData( + "process-rate-timeline", + "process-rate-histogram", + processRateData, + minBatchTime, + maxBatchTime, + minProcessRate, + maxProcessRate, + "records/sec") + graphUIDataForProcessRate.generateDataJs(jsCollector) + + val graphUIDataForInputRows = + new GraphUIData( + "input-rows-timeline", + "input-rows-histogram", + inputRowsData, + minBatchTime, + maxBatchTime, + minRows, + maxRows, + "records") + graphUIDataForInputRows.generateDataJs(jsCollector) + + val graphUIDataForBatchDuration = + new GraphUIData( + "batch-duration-timeline", + "batch-duration-histogram", + batchDurations, + minBatchTime, + maxBatchTime, + minBatchDuration, + maxBatchDuration, + "ms") + graphUIDataForBatchDuration.generateDataJs(jsCollector) + + val graphUIDataForDuration = + new GraphUIData( + "duration-area-stack", + "", + Seq.empty[(Long, Double)], + 0L, + 0L, + 0L, + 0L, + "ms") + + val table = + // scalastyle:off + <table id="stat-table" class="table table-bordered" style="width: auto"> + <thead> + <tr> + <th style="width: 160px;"></th> + <th style="width: 492px;">Timelines</th> + <th style="width: 350px;">Histograms</th></tr> + </thead> + <tbody> + <tr> + <td style="vertical-align: middle;"> + <div style="width: 160px;"> + <div><strong>Input Rate {SparkUIUtils.tooltip("The aggregate (across all sources) rate of data arriving.", "right")}</strong></div> + </div> + </td> + <td class="timeline">{graphUIDataForInputRate.generateTimelineHtml(jsCollector)}</td> + <td class="histogram">{graphUIDataForInputRate.generateHistogramHtml(jsCollector)}</td> + </tr> + <tr> + <td style="vertical-align: middle;"> + <div style="width: 160px;"> + <div><strong>Process Rate {SparkUIUtils.tooltip("The aggregate (across all sources) rate at which Spark is processing data.", "right")}</strong></div> + </div> + </td> + <td class="timeline">{graphUIDataForProcessRate.generateTimelineHtml(jsCollector)}</td> + <td class="histogram">{graphUIDataForProcessRate.generateHistogramHtml(jsCollector)}</td> + </tr> + <tr> + <td style="vertical-align: middle;"> + <div style="width: 160px;"> + <div><strong>Input Rows {SparkUIUtils.tooltip("The aggregate (across all sources) number of records processed in a trigger.", "right")}</strong></div> + </div> + </td> + <td class="timeline">{graphUIDataForInputRows.generateTimelineHtml(jsCollector)}</td> + <td class="histogram">{graphUIDataForInputRows.generateHistogramHtml(jsCollector)}</td> + </tr> + <tr> + <td style="vertical-align: middle;"> + <div style="width: 160px;"> + <div><strong>Batch Duration {SparkUIUtils.tooltip("The process duration of each batch.", "right")}</strong></div> + </div> + </td> + <td class="timeline">{graphUIDataForBatchDuration.generateTimelineHtml(jsCollector)}</td> + <td class="histogram">{graphUIDataForBatchDuration.generateHistogramHtml(jsCollector)}</td> + </tr> + <tr> + <td style="vertical-align: middle;"> + <div style="width: auto;"> + <div><strong>Operation Duration {SparkUIUtils.tooltip("The amount of time taken to perform various operations in milliseconds.", "right")}</strong></div> + </div> + </td> + <td class="duration-area-stack" colspan="2">{graphUIDataForDuration.generateAreaStackHtmlWithData(jsCollector, operationDurationData)}</td> + </tr> + </tbody> + </table> + // scalastyle:on + + generateVar(operationDurationData) ++ generateTimeMap(batchTimes) ++ table ++ jsCollector.toHtml + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala new file mode 100644 index 0000000..db085db --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.ui + +import java.text.SimpleDateFormat +import java.util.UUID +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress} + +/** + * A customized StreamingQueryListener used in structured streaming UI, which contains all + * UI data for both active and inactive query. + * TODO: Add support for history server. + */ +private[sql] class StreamingQueryStatusListener(sqlConf: SQLConf) extends StreamingQueryListener { + + private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 + timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC")) + + /** + * We use runId as the key here instead of id in active query status map, + * because the runId is unique for every started query, even it its a restart. + */ + private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID, StreamingQueryUIData]() + private[ui] val inactiveQueryStatus = new mutable.Queue[StreamingQueryUIData]() + + private val streamingProgressRetention = sqlConf.streamingProgressRetention + private val inactiveQueryStatusRetention = sqlConf.streamingUIInactiveQueryRetention + + override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { + activeQueryStatus.putIfAbsent(event.runId, + new StreamingQueryUIData(event.name, event.id, event.runId, event.submissionTime)) + } + + override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { + val batchTimestamp = timestampFormat.parse(event.progress.timestamp).getTime + val queryStatus = activeQueryStatus.getOrDefault( + event.progress.runId, + new StreamingQueryUIData(event.progress.name, event.progress.id, event.progress.runId, + batchTimestamp)) + queryStatus.updateProcess(event.progress, streamingProgressRetention) + } + + override def onQueryTerminated( + event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized { + val queryStatus = activeQueryStatus.remove(event.runId) + if (queryStatus != null) { + queryStatus.queryTerminated(event) + inactiveQueryStatus += queryStatus + while (inactiveQueryStatus.length >= inactiveQueryStatusRetention) { + inactiveQueryStatus.dequeue() + } + } + } + + def allQueryStatus: Seq[StreamingQueryUIData] = synchronized { + activeQueryStatus.values().asScala.toSeq ++ inactiveQueryStatus + } +} + +/** + * This class contains all message related to UI display, each instance corresponds to a single + * [[org.apache.spark.sql.streaming.StreamingQuery]]. + */ +private[ui] class StreamingQueryUIData( + val name: String, + val id: UUID, + val runId: UUID, + val submissionTime: Long) { + + /** Holds the most recent query progress updates. */ + private val progressBuffer = new mutable.Queue[StreamingQueryProgress]() + + private var _isActive = true + private var _exception: Option[String] = None + + def isActive: Boolean = synchronized { _isActive } + + def exception: Option[String] = synchronized { _exception } + + def queryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized { + _isActive = false + _exception = event.exception + } + + def updateProcess( + newProgress: StreamingQueryProgress, retentionNum: Int): Unit = progressBuffer.synchronized { + progressBuffer += newProgress + while (progressBuffer.length >= retentionNum) { + progressBuffer.dequeue() + } + } + + def recentProgress: Array[StreamingQueryProgress] = progressBuffer.synchronized { + progressBuffer.toArray + } + + def lastProgress: StreamingQueryProgress = progressBuffer.synchronized { + progressBuffer.lastOption.orNull + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala similarity index 54% copy from streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala copy to sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala index 3ecc448..f909cfd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala @@ -14,35 +14,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package org.apache.spark.streaming.ui +package org.apache.spark.sql.streaming.ui import org.apache.spark.internal.Logging -import org.apache.spark.streaming.StreamingContext import org.apache.spark.ui.{SparkUI, SparkUITab} -/** - * Spark Web UI tab that shows statistics of a streaming job. - * This assumes the given SparkContext has enabled its SparkUI. - */ -private[spark] class StreamingTab(val ssc: StreamingContext, sparkUI: SparkUI) - extends SparkUITab(sparkUI, "streaming") with Logging { +private[sql] class StreamingQueryTab( + val statusListener: StreamingQueryStatusListener, + sparkUI: SparkUI) extends SparkUITab(sparkUI, "StreamingQuery") with Logging { - private val STATIC_RESOURCE_DIR = "org/apache/spark/streaming/ui/static" + override val name = "Structured Streaming" val parent = sparkUI - val listener = ssc.progressListener - attachPage(new StreamingPage(this)) - attachPage(new BatchPage(this)) + attachPage(new StreamingQueryPage(this)) + attachPage(new StreamingQueryStatisticsPage(this)) + parent.attachTab(this) - def attach(): Unit = { - parent.attachTab(this) - parent.addStaticHandler(STATIC_RESOURCE_DIR, "/static/streaming") - } + parent.addStaticHandler(StreamingQueryTab.STATIC_RESOURCE_DIR, "/static/sql") +} - def detach(): Unit = { - parent.detachTab(this) - parent.detachHandler("/static/streaming") - } +object StreamingQueryTab { + private val STATIC_RESOURCE_DIR = "org/apache/spark/sql/execution/ui/static" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala new file mode 100644 index 0000000..57b9dec --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.ui + +private[ui] object UIUtils { + + /** + * Check whether `number` is valid, if not return 0.0d + */ + def withNumberInvalid(number: => Double): Double = { + if (number.isNaN || number.isInfinite) { + 0.0d + } else { + number + } + } + + /** + * Execute a block of code when there is already one completed batch in streaming query, + * otherwise return `default` value. + */ + def withNoProgress[T](query: StreamingQueryUIData, body: => T, default: T): T = { + if (query.lastProgress != null) { + body + } else { + default + } + } + + def getQueryName(query: StreamingQueryUIData): String = { + if (query.name == null || query.name.isEmpty) { + "<no name>" + } else { + query.name + } + } + + def getQueryStatus(query: StreamingQueryUIData): String = { + if (query.isActive) { + "RUNNING" + } else { + query.exception.map(_ => "FAILED").getOrElse("FINISHED") + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 2f66dd32..9d0f829 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -23,7 +23,6 @@ import scala.collection.mutable import org.scalactic.TolerantNumerics import org.scalatest.BeforeAndAfter -import org.scalatest.PrivateMethodTester._ import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.concurrent.Waiters.Waiter @@ -34,6 +33,7 @@ import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.StreamingQueryListener._ +import org.apache.spark.sql.streaming.ui.StreamingQueryStatusListener import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.util.JsonProtocol @@ -47,7 +47,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { after { spark.streams.active.foreach(_.stop()) assert(spark.streams.active.isEmpty) - assert(spark.streams.listListeners().isEmpty) + // Skip check default `StreamingQueryStatusListener` which is for streaming UI. + assert(spark.streams.listListeners() + .filterNot(_.isInstanceOf[StreamingQueryStatusListener]).isEmpty) // Make sure we don't leak any events to the next test spark.sparkContext.listenerBus.waitUntilEmpty() } @@ -252,8 +254,8 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { assert(newEvent.name === event.name) } - testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, "name")) - testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, null)) + testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, "name", 1L)) + testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, null, 1L)) } test("QueryProgressEvent serialization") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index b6a6be2..6f00b52 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -312,6 +312,7 @@ object StreamingQueryStatusAndProgressSuite { name = "myName", timestamp = "2016-12-05T20:54:20.827Z", batchId = 2L, + batchDuration = 0L, durationMs = new java.util.HashMap(Map("total" -> 0L).mapValues(long2Long).asJava), eventTime = new java.util.HashMap(Map( "max" -> "2016-12-05T20:54:20.827Z", @@ -346,6 +347,7 @@ object StreamingQueryStatusAndProgressSuite { name = null, // should not be present in the json timestamp = "2016-12-05T20:54:20.827Z", batchId = 2L, + batchDuration = 0L, durationMs = new java.util.HashMap(Map("total" -> 0L).mapValues(long2Long).asJava), // empty maps should be handled correctly eventTime = new java.util.HashMap(Map.empty[String, String].asJava), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 4121f49..77f5c85 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -466,7 +466,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi val streamingTriggerDF = spark.createDataset(1 to 10).toDF val streamingInputDF = createSingleTriggerStreamingDF(streamingTriggerDF).toDF("value") - val progress = getFirstProgress(streamingInputDF.join(streamingInputDF, "value")) + val progress = getStreamingQuery(streamingInputDF.join(streamingInputDF, "value")) + .recentProgress.head assert(progress.numInputRows === 20) // data is read multiple times in self-joins assert(progress.sources.size === 1) assert(progress.sources(0).numInputRows === 20) @@ -479,7 +480,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi // Trigger input has 10 rows, static input has 2 rows, // therefore after the first trigger, the calculated input rows should be 10 - val progress = getFirstProgress(streamingInputDF.join(staticInputDF, "value")) + val progress = getStreamingQuery(streamingInputDF.join(staticInputDF, "value")) + .recentProgress.head assert(progress.numInputRows === 10) assert(progress.sources.size === 1) assert(progress.sources(0).numInputRows === 10) @@ -492,7 +494,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi val streamingInputDF = createSingleTriggerStreamingDF(streamingTriggerDF) // After the first trigger, the calculated input rows should be 10 - val progress = getFirstProgress(streamingInputDF) + val progress = getStreamingQuery(streamingInputDF).recentProgress.head assert(progress.numInputRows === 10) assert(progress.sources.size === 1) assert(progress.sources(0).numInputRows === 10) @@ -1120,12 +1122,12 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi StreamingExecutionRelation(source, spark) } - /** Returns the query progress at the end of the first trigger of streaming DF */ - private def getFirstProgress(streamingDF: DataFrame): StreamingQueryProgress = { + /** Returns the query at the end of the first trigger of streaming DF */ + private def getStreamingQuery(streamingDF: DataFrame): StreamingQuery = { try { val q = streamingDF.writeStream.format("memory").queryName("test").start() q.processAllAvailable() - q.recentProgress.head + q } finally { spark.streams.active.map(_.stop()) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala new file mode 100644 index 0000000..de43e47 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.ui + +import java.util.{Locale, UUID} +import javax.servlet.http.HttpServletRequest + +import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS} +import org.scalatest.BeforeAndAfter +import scala.xml.Node + +import org.apache.spark.sql.streaming.StreamingQueryProgress +import org.apache.spark.sql.test.SharedSparkSession + +class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter { + + test("correctly display streaming query page") { + val id = UUID.randomUUID() + val request = mock(classOf[HttpServletRequest]) + val tab = mock(classOf[StreamingQueryTab], RETURNS_SMART_NULLS) + val statusListener = mock(classOf[StreamingQueryStatusListener], RETURNS_SMART_NULLS) + when(tab.appName).thenReturn("testing") + when(tab.headerTabs).thenReturn(Seq.empty) + when(tab.statusListener).thenReturn(statusListener) + + val streamQuery = createStreamQueryUIData(id) + when(statusListener.allQueryStatus).thenReturn(Seq(streamQuery)) + var html = renderStreamingQueryPage(request, tab) + .toString().toLowerCase(Locale.ROOT) + assert(html.contains("active streaming queries (1)")) + assert(html.contains("completed streaming queries (0)")) + + when(streamQuery.isActive).thenReturn(false) + when(streamQuery.exception).thenReturn(None) + html = renderStreamingQueryPage(request, tab) + .toString().toLowerCase(Locale.ROOT) + assert(html.contains("active streaming queries (0)")) + assert(html.contains("completed streaming queries (1)")) + assert(html.contains("finished")) + + when(streamQuery.isActive).thenReturn(false) + when(streamQuery.exception).thenReturn(Option("exception in query")) + html = renderStreamingQueryPage(request, tab) + .toString().toLowerCase(Locale.ROOT) + assert(html.contains("active streaming queries (0)")) + assert(html.contains("completed streaming queries (1)")) + assert(html.contains("failed")) + assert(html.contains("exception in query")) + } + + test("correctly display streaming query statistics page") { + val id = UUID.randomUUID() + val request = mock(classOf[HttpServletRequest]) + val tab = mock(classOf[StreamingQueryTab], RETURNS_SMART_NULLS) + val statusListener = mock(classOf[StreamingQueryStatusListener], RETURNS_SMART_NULLS) + when(request.getParameter("id")).thenReturn(id.toString) + when(tab.appName).thenReturn("testing") + when(tab.headerTabs).thenReturn(Seq.empty) + when(tab.statusListener).thenReturn(statusListener) + + val streamQuery = createStreamQueryUIData(id) + when(statusListener.allQueryStatus).thenReturn(Seq(streamQuery)) + val html = renderStreamingQueryStatisticsPage(request, tab) + .toString().toLowerCase(Locale.ROOT) + + assert(html.contains("<strong>name: </strong>query<")) + assert(html.contains("""{"x": 1001898000100, "y": 10.0}""")) + assert(html.contains("""{"x": 1001898000100, "y": 12.0}""")) + assert(html.contains("(<strong>3</strong> completed batches)")) + } + + private def createStreamQueryUIData(id: UUID): StreamingQueryUIData = { + val progress = mock(classOf[StreamingQueryProgress], RETURNS_SMART_NULLS) + when(progress.timestamp).thenReturn("2001-10-01T01:00:00.100Z") + when(progress.inputRowsPerSecond).thenReturn(10.0) + when(progress.processedRowsPerSecond).thenReturn(12.0) + when(progress.batchId).thenReturn(2) + when(progress.prettyJson).thenReturn("""{"a":1}""") + + val streamQuery = mock(classOf[StreamingQueryUIData], RETURNS_SMART_NULLS) + when(streamQuery.isActive).thenReturn(true) + when(streamQuery.name).thenReturn("query") + when(streamQuery.id).thenReturn(id) + when(streamQuery.runId).thenReturn(id) + when(streamQuery.submissionTime).thenReturn(1L) + when(streamQuery.lastProgress).thenReturn(progress) + when(streamQuery.recentProgress).thenReturn(Array(progress)) + when(streamQuery.exception).thenReturn(None) + + streamQuery + } + + /** + * Render a stage page started with the given conf and return the HTML. + * This also runs a dummy execution page to populate the page with useful content. + */ + private def renderStreamingQueryPage( + request: HttpServletRequest, + tab: StreamingQueryTab): Seq[Node] = { + val page = new StreamingQueryPage(tab) + page.render(request) + } + + private def renderStreamingQueryStatisticsPage( + request: HttpServletRequest, + tab: StreamingQueryTab): Seq[Node] = { + val page = new StreamingQueryStatisticsPage(tab) + page.render(request) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala new file mode 100644 index 0000000..bd74ed3 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.ui + +import java.util.UUID + +import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS} + +import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress, StreamTest} +import org.apache.spark.sql.streaming + +class StreamingQueryStatusListenerSuite extends StreamTest { + + test("onQueryStarted, onQueryProgress, onQueryTerminated") { + val listener = new StreamingQueryStatusListener(spark.sqlContext.conf) + + // hanlde query started event + val id = UUID.randomUUID() + val runId = UUID.randomUUID() + val startEvent = new StreamingQueryListener.QueryStartedEvent(id, runId, "test", 1L) + listener.onQueryStarted(startEvent) + + // result checking + assert(listener.activeQueryStatus.size() == 1) + assert(listener.activeQueryStatus.get(runId).name == "test") + + // handle query progress event + val progress = mock(classOf[StreamingQueryProgress], RETURNS_SMART_NULLS) + when(progress.id).thenReturn(id) + when(progress.runId).thenReturn(runId) + when(progress.timestamp).thenReturn("2001-10-01T01:00:00.100Z") + when(progress.inputRowsPerSecond).thenReturn(10.0) + when(progress.processedRowsPerSecond).thenReturn(12.0) + when(progress.batchId).thenReturn(2) + when(progress.prettyJson).thenReturn("""{"a":1}""") + val processEvent = new streaming.StreamingQueryListener.QueryProgressEvent(progress) + listener.onQueryProgress(processEvent) + + // result checking + val activeQuery = listener.activeQueryStatus.get(runId) + assert(activeQuery.isActive) + assert(activeQuery.recentProgress.length == 1) + assert(activeQuery.lastProgress.id == id) + assert(activeQuery.lastProgress.runId == runId) + assert(activeQuery.lastProgress.timestamp == "2001-10-01T01:00:00.100Z") + assert(activeQuery.lastProgress.inputRowsPerSecond == 10.0) + assert(activeQuery.lastProgress.processedRowsPerSecond == 12.0) + assert(activeQuery.lastProgress.batchId == 2) + assert(activeQuery.lastProgress.prettyJson == """{"a":1}""") + + // handle terminate event + val terminateEvent = new StreamingQueryListener.QueryTerminatedEvent(id, runId, None) + listener.onQueryTerminated(terminateEvent) + + assert(!listener.inactiveQueryStatus.head.isActive) + assert(listener.inactiveQueryStatus.head.runId == runId) + assert(listener.inactiveQueryStatus.head.id == id) + } + + test("same query start multiple times") { + val listener = new StreamingQueryStatusListener(spark.sqlContext.conf) + + // handle first time start + val id = UUID.randomUUID() + val runId0 = UUID.randomUUID() + val startEvent0 = new StreamingQueryListener.QueryStartedEvent(id, runId0, "test", 1L) + listener.onQueryStarted(startEvent0) + + // handle terminate event + val terminateEvent0 = new StreamingQueryListener.QueryTerminatedEvent(id, runId0, None) + listener.onQueryTerminated(terminateEvent0) + + // handle second time start + val runId1 = UUID.randomUUID() + val startEvent1 = new StreamingQueryListener.QueryStartedEvent(id, runId1, "test", 1L) + listener.onQueryStarted(startEvent1) + + // result checking + assert(listener.activeQueryStatus.size() == 1) + assert(listener.inactiveQueryStatus.length == 1) + assert(listener.activeQueryStatus.containsKey(runId1)) + assert(listener.activeQueryStatus.get(runId1).id == id) + assert(listener.inactiveQueryStatus.head.runId == runId0) + assert(listener.inactiveQueryStatus.head.id == id) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UIUtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UIUtilsSuite.scala new file mode 100644 index 0000000..46f2ead --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UIUtilsSuite.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.ui + +import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS} +import org.scalatest.Matchers + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.streaming.StreamingQueryProgress + +class UIUtilsSuite extends SparkFunSuite with Matchers { + test("streaming query started with no batch completed") { + val query = mock(classOf[StreamingQueryUIData], RETURNS_SMART_NULLS) + when(query.lastProgress).thenReturn(null) + + assert(0 == UIUtils.withNoProgress(query, 1, 0)) + } + + test("streaming query started with at least one batch completed") { + val query = mock(classOf[StreamingQueryUIData], RETURNS_SMART_NULLS) + val progress = mock(classOf[StreamingQueryProgress], RETURNS_SMART_NULLS) + when(query.lastProgress).thenReturn(progress) + + assert(1 == UIUtils.withNoProgress(query, 1, 0)) + } +} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala index adfda0c5..890a668 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala @@ -375,21 +375,7 @@ private[ui] class SqlStatsPagedTable( } else { errorMessage }) - val details = if (isMultiline) { - // scalastyle:off - <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')" - class="expand-details"> - + details - </span> ++ - <div class="stacktrace-details collapsed"> - <pre> - {errorMessage} - </pre> - </div> - // scalastyle:on - } else { - "" - } + val details = detailsUINode(isMultiline, errorMessage) <td> {errorSummary}{details} </td> diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index e360b4a..6c981b2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -33,7 +33,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext.rddToFileName import org.apache.spark.streaming.scheduler.Job -import org.apache.spark.streaming.ui.UIUtils +import org.apache.spark.ui.{UIUtils => SparkUIUtils} import org.apache.spark.util.{CallSite, Utils} /** @@ -138,7 +138,7 @@ abstract class DStream[T: ClassTag] ( */ private def makeScope(time: Time): Option[RDDOperationScope] = { baseScope.map { bsJson => - val formattedBatchTime = UIUtils.formatBatchTime( + val formattedBatchTime = SparkUIUtils.formatBatchTime( time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false) val bs = RDDOperationScope.fromJson(bsJson) val baseName = bs.name // e.g. countByWindow, "kafka stream [0]" diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 5d543c5..7eea57c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -28,7 +28,7 @@ import org.apache.spark.internal.io.SparkHadoopWriterUtils import org.apache.spark.rdd.RDD import org.apache.spark.streaming._ import org.apache.spark.streaming.api.python.PythonDStream -import org.apache.spark.streaming.ui.UIUtils +import org.apache.spark.ui.{UIUtils => SparkUIUtils} import org.apache.spark.util.{EventLoop, ThreadUtils, Utils} @@ -230,7 +230,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { val oldProps = ssc.sparkContext.getLocalProperties try { ssc.sparkContext.setLocalProperties(Utils.cloneProperties(ssc.savedProperties.get())) - val formattedTime = UIUtils.formatBatchTime( + val formattedTime = SparkUIUtils.formatBatchTime( job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false) val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}" val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]" diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala index f1070e9..b5a0e92 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala @@ -51,7 +51,7 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) protected def baseRow(batch: BatchUIData): Seq[Node] = { val batchTime = batch.batchTime.milliseconds - val formattedBatchTime = UIUtils.formatBatchTime(batchTime, batchInterval) + val formattedBatchTime = SparkUIUtils.formatBatchTime(batchTime, batchInterval) val numRecords = batch.numRecords val schedulingDelay = batch.schedulingDelay val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala index 2c85d26..04cd063 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala @@ -325,7 +325,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { throw new IllegalArgumentException(s"Missing id parameter") } val formattedBatchTime = - UIUtils.formatBatchTime(batchTime.milliseconds, streamingListener.batchDuration) + SparkUIUtils.formatBatchTime(batchTime.milliseconds, streamingListener.batchDuration) val batchUIData = streamingListener.getBatchUIData(batchTime).getOrElse { throw new IllegalArgumentException(s"Batch $formattedBatchTime does not exist") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 31ebb4c..d47287b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -20,78 +20,10 @@ package org.apache.spark.streaming.ui import java.util.concurrent.TimeUnit import javax.servlet.http.HttpServletRequest -import scala.collection.mutable.ArrayBuffer import scala.xml.{Node, Unparsed} import org.apache.spark.internal.Logging -import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage} - -/** - * A helper class to generate JavaScript and HTML for both timeline and histogram graphs. - * - * @param timelineDivId the timeline `id` used in the html `div` tag - * @param histogramDivId the timeline `id` used in the html `div` tag - * @param data the data for the graph - * @param minX the min value of X axis - * @param maxX the max value of X axis - * @param minY the min value of Y axis - * @param maxY the max value of Y axis - * @param unitY the unit of Y axis - * @param batchInterval if `batchInterval` is not None, we will draw a line for `batchInterval` in - * the graph - */ -private[ui] class GraphUIData( - timelineDivId: String, - histogramDivId: String, - data: Seq[(Long, Double)], - minX: Long, - maxX: Long, - minY: Double, - maxY: Double, - unitY: String, - batchInterval: Option[Double] = None) { - - private var dataJavaScriptName: String = _ - - def generateDataJs(jsCollector: JsCollector): Unit = { - val jsForData = data.map { case (x, y) => - s"""{"x": $x, "y": $y}""" - }.mkString("[", ",", "]") - dataJavaScriptName = jsCollector.nextVariableName - jsCollector.addPreparedStatement(s"var $dataJavaScriptName = $jsForData;") - } - - def generateTimelineHtml(jsCollector: JsCollector): Seq[Node] = { - jsCollector.addPreparedStatement(s"registerTimeline($minY, $maxY);") - if (batchInterval.isDefined) { - jsCollector.addStatement( - "drawTimeline(" + - s"'#$timelineDivId', $dataJavaScriptName, $minX, $maxX, $minY, $maxY, '$unitY'," + - s" ${batchInterval.get}" + - ");") - } else { - jsCollector.addStatement( - s"drawTimeline('#$timelineDivId', $dataJavaScriptName, $minX, $maxX, $minY, $maxY," + - s" '$unitY');") - } - <div id={timelineDivId}></div> - } - - def generateHistogramHtml(jsCollector: JsCollector): Seq[Node] = { - val histogramData = s"$dataJavaScriptName.map(function(d) { return d.y; })" - jsCollector.addPreparedStatement(s"registerHistogram($histogramData, $minY, $maxY);") - if (batchInterval.isDefined) { - jsCollector.addStatement( - "drawHistogram(" + - s"'#$histogramDivId', $histogramData, $minY, $maxY, '$unitY', ${batchInterval.get}" + - ");") - } else { - jsCollector.addStatement( - s"drawHistogram('#$histogramDivId', $histogramData, $minY, $maxY, '$unitY');") - } - <div id={histogramDivId}></div> - } -} +import org.apache.spark.ui.{GraphUIData, JsCollector, UIUtils => SparkUIUtils, WebUIPage} /** * A helper class for "scheduling delay", "processing time" and "total delay" to generate data that @@ -164,8 +96,8 @@ private[ui] class StreamingPage(parent: StreamingTab) private def generateLoadResources(request: HttpServletRequest): Seq[Node] = { // scalastyle:off <script src={SparkUIUtils.prependBaseUri(request, "/static/d3.min.js")}></script> - <link rel="stylesheet" href={SparkUIUtils.prependBaseUri(request, "/static/streaming/streaming-page.css")} type="text/css"/> - <script src={SparkUIUtils.prependBaseUri(request, "/static/streaming/streaming-page.js")}></script> + <link rel="stylesheet" href={SparkUIUtils.prependBaseUri(request, "/static/streaming-page.css")} type="text/css"/> + <script src={SparkUIUtils.prependBaseUri(request, "/static/streaming-page.js")}></script> // scalastyle:on } @@ -201,7 +133,7 @@ private[ui] class StreamingPage(parent: StreamingTab) private def generateTimeMap(times: Seq[Long]): Seq[Node] = { val js = "var timeFormat = {};\n" + times.map { time => val formattedTime = - UIUtils.formatBatchTime(time, listener.batchDuration, showYYYYMMSS = false) + SparkUIUtils.formatBatchTime(time, listener.batchDuration, showYYYYMMSS = false) s"timeFormat[$time] = '$formattedTime';" }.mkString("\n") @@ -544,52 +476,3 @@ private[ui] object StreamingPage { } -/** - * A helper class that allows the user to add JavaScript statements which will be executed when the - * DOM has finished loading. - */ -private[ui] class JsCollector { - - private var variableId = 0 - - /** - * Return the next unused JavaScript variable name - */ - def nextVariableName: String = { - variableId += 1 - "v" + variableId - } - - /** - * JavaScript statements that will execute before `statements` - */ - private val preparedStatements = ArrayBuffer[String]() - - /** - * JavaScript statements that will execute after `preparedStatements` - */ - private val statements = ArrayBuffer[String]() - - def addPreparedStatement(js: String): Unit = { - preparedStatements += js - } - - def addStatement(js: String): Unit = { - statements += js - } - - /** - * Generate a html snippet that will execute all scripts when the DOM has finished loading. - */ - def toHtml: Seq[Node] = { - val js = - s""" - |$$(document).ready(function() { - | ${preparedStatements.mkString("\n")} - | ${statements.mkString("\n")} - |});""".stripMargin - - <script>{Unparsed(js)}</script> - } -} - diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala index 3ecc448..d616b47 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala @@ -28,7 +28,7 @@ import org.apache.spark.ui.{SparkUI, SparkUITab} private[spark] class StreamingTab(val ssc: StreamingContext, sparkUI: SparkUI) extends SparkUITab(sparkUI, "streaming") with Logging { - private val STATIC_RESOURCE_DIR = "org/apache/spark/streaming/ui/static" + private val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static" val parent = sparkUI val listener = ssc.progressListener diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala index c21912a..dc1af0a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala @@ -17,14 +17,14 @@ package org.apache.spark.streaming.ui -import java.text.SimpleDateFormat -import java.util.{Locale, TimeZone} import java.util.concurrent.TimeUnit import scala.xml.Node import org.apache.commons.text.StringEscapeUtils +import org.apache.spark.ui.{ UIUtils => SparkUIUtils } + private[streaming] object UIUtils { /** @@ -78,59 +78,6 @@ private[streaming] object UIUtils { case TimeUnit.DAYS => milliseconds / 1000.0 / 60.0 / 60.0 / 24.0 } - // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use. - private val batchTimeFormat = new ThreadLocal[SimpleDateFormat]() { - override def initialValue(): SimpleDateFormat = - new SimpleDateFormat("yyyy/MM/dd HH:mm:ss", Locale.US) - } - - private val batchTimeFormatWithMilliseconds = new ThreadLocal[SimpleDateFormat]() { - override def initialValue(): SimpleDateFormat = - new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS", Locale.US) - } - - /** - * If `batchInterval` is less than 1 second, format `batchTime` with milliseconds. Otherwise, - * format `batchTime` without milliseconds. - * - * @param batchTime the batch time to be formatted - * @param batchInterval the batch interval - * @param showYYYYMMSS if showing the `yyyy/MM/dd` part. If it's false, the return value wll be - * only `HH:mm:ss` or `HH:mm:ss.SSS` depending on `batchInterval` - * @param timezone only for test - */ - def formatBatchTime( - batchTime: Long, - batchInterval: Long, - showYYYYMMSS: Boolean = true, - timezone: TimeZone = null): String = { - val oldTimezones = - (batchTimeFormat.get.getTimeZone, batchTimeFormatWithMilliseconds.get.getTimeZone) - if (timezone != null) { - batchTimeFormat.get.setTimeZone(timezone) - batchTimeFormatWithMilliseconds.get.setTimeZone(timezone) - } - try { - val formattedBatchTime = - if (batchInterval < 1000) { - batchTimeFormatWithMilliseconds.get.format(batchTime) - } else { - // If batchInterval >= 1 second, don't show milliseconds - batchTimeFormat.get.format(batchTime) - } - if (showYYYYMMSS) { - formattedBatchTime - } else { - formattedBatchTime.substring(formattedBatchTime.indexOf(' ') + 1) - } - } finally { - if (timezone != null) { - batchTimeFormat.get.setTimeZone(oldTimezones._1) - batchTimeFormatWithMilliseconds.get.setTimeZone(oldTimezones._2) - } - } - } - def createOutputOperationFailureForUI(failure: String): String = { if (failure.startsWith("org.apache.spark.Spark")) { // SparkException or SparkDriverExecutionException @@ -164,19 +111,7 @@ private[streaming] object UIUtils { } else { failureReason } - val details = if (isMultiline) { - // scalastyle:off - <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')" - class="expand-details"> - +details - </span> ++ - <div class="stacktrace-details collapsed"> - <pre>{failureDetails}</pre> - </div> - // scalastyle:on - } else { - "" - } + val details = SparkUIUtils.detailsUINode(isMultiline, failureDetails) if (rowspan == 1) { <td valign="middle" style="max-width: 300px">{failureReasonSummary}{details}</td> diff --git a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala index 1bb4116..36036fc 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala @@ -19,12 +19,10 @@ package org.apache.spark.streaming import scala.collection.mutable.ArrayBuffer -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} - import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.rdd.{RDD, RDDOperationScope} import org.apache.spark.streaming.dstream.DStream -import org.apache.spark.streaming.ui.UIUtils +import org.apache.spark.ui.{UIUtils => SparkUIUtils} import org.apache.spark.util.ManualClock /** @@ -214,7 +212,7 @@ class DStreamScopeSuite rddScope: RDDOperationScope, batchTime: Long): Unit = { val (baseScopeId, baseScopeName) = (baseScope.id, baseScope.name) - val formattedBatchTime = UIUtils.formatBatchTime( + val formattedBatchTime = SparkUIUtils.formatBatchTime( batchTime, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false) assert(rddScope.id === s"${baseScopeId}_$batchTime") assert(rddScope.name.replaceAll("\\n", " ") === s"$baseScopeName @ $formattedBatchTime") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala index d3ca2b5..5760837 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit import org.scalatest.Matchers import org.apache.spark.SparkFunSuite +import org.apache.spark.ui.{UIUtils => SparkUIUtils} class UIUtilsSuite extends SparkFunSuite with Matchers{ @@ -70,10 +71,13 @@ class UIUtilsSuite extends SparkFunSuite with Matchers{ test("formatBatchTime") { val tzForTest = TimeZone.getTimeZone("America/Los_Angeles") val batchTime = 1431637480452L // Thu May 14 14:04:40 PDT 2015 - assert("2015/05/14 14:04:40" === UIUtils.formatBatchTime(batchTime, 1000, timezone = tzForTest)) + assert("2015/05/14 14:04:40" === + SparkUIUtils.formatBatchTime(batchTime, 1000, timezone = tzForTest)) assert("2015/05/14 14:04:40.452" === - UIUtils.formatBatchTime(batchTime, 999, timezone = tzForTest)) - assert("14:04:40" === UIUtils.formatBatchTime(batchTime, 1000, false, timezone = tzForTest)) - assert("14:04:40.452" === UIUtils.formatBatchTime(batchTime, 999, false, timezone = tzForTest)) + SparkUIUtils.formatBatchTime(batchTime, 999, timezone = tzForTest)) + assert("14:04:40" === + SparkUIUtils.formatBatchTime(batchTime, 1000, false, timezone = tzForTest)) + assert("14:04:40.452" === + SparkUIUtils.formatBatchTime(batchTime, 999, false, timezone = tzForTest)) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org