HeartSaVioR opened a new pull request #27046: [SPARK-29348][SQL][FOLLOWUP] Add example of batch query for observable metrics URL: https://github.com/apache/spark/pull/27046 ### What changes were proposed in this pull request? This patch adds the example of batch query for observable metrics in `Dataset.observe`, as well as fixes a small bug in the example of streaming query. ### Why are the changes needed? The doc of `Dataset.observe` lacks the example for batch query, and it's requested by https://github.com/apache/spark/pull/26127#discussion_r361886847 ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Ran below two queries via spark-shell: > batch ``` import scala.util.Random import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.util.QueryExecutionListener spark.listenerManager.register(new QueryExecutionListener() { override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { qe.observedMetrics.get("my_event").foreach { row => // Trigger if the number of errors exceeds 5 percent val num_rows = row.getAs[Long]("rc") val num_error_rows = row.getAs[Long]("erc") val ratio = num_error_rows.toDouble / num_rows if (ratio > 0.05) { // Trigger alert println(s"alert! error ratio: $ratio") } } } override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = { // No-op } }) val rand = new Random() val df = (0 to 10).map { idx => (idx, if (rand.nextBoolean()) "error" else null ) }.toDF val ds = df.selectExpr("_1 AS id", "_2 AS error") // Observe row count (rc) and error row count (erc) in the batch Dataset val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc")) observed_ds.collect() ``` > streaming ``` import scala.collection.JavaConverters._ import scala.util.Random import org.apache.spark.sql.streaming.StreamingQueryListener import org.apache.spark.sql.streaming.StreamingQueryListener._ spark.streams.addListener(new StreamingQueryListener() { override def onQueryProgress(event: QueryProgressEvent): Unit = { event.progress.observedMetrics.asScala.get("my_event").foreach { row => // Trigger if the number of errors exceeds 5 percent val num_rows = row.getAs[Long]("rc") val num_error_rows = row.getAs[Long]("erc") val ratio = num_error_rows.toDouble / num_rows if (ratio > 0.05) { // Trigger alert println(s"alert! error ratio: $ratio") } } } def onQueryStarted(event: QueryStartedEvent): Unit = {} def onQueryTerminated(event: QueryTerminatedEvent): Unit = {} }) val rates = spark .readStream .format("rate") .option("rowsPerSecond", 10) .load val rand = new Random() val df = rates.map { row => (row.getLong(1), if (row.getLong(1) % 2 == 0) "error" else null ) }.toDF val ds = df.selectExpr("_1 AS id", "_2 AS error") // Observe row count (rc) and error row count (erc) in the batch Dataset val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc")) observed_ds.writeStream.format("console").start() ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
