HeartSaVioR commented on a change in pull request #27046: 
[SPARK-29348][SQL][FOLLOWUP] Add example of batch query for observable metrics
URL: https://github.com/apache/spark/pull/27046#discussion_r361895840
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
 ##########
 @@ -1891,14 +1891,34 @@ class Dataset[T] private[sql](
   * [[org.apache.spark.sql.util.QueryExecutionListener]] to the spark session.
   *
   * {{{
-  *   // Observe row count (rc) and error row count (erc) in the streaming 
Dataset
+  *   // Monitor the metrics for batch query using a listener.
+  *   spark.listenerManager.register(new QueryExecutionListener() {
+  *     override def onSuccess(funcName: String, qe: QueryExecution, 
durationNs: Long): Unit = {
+  *       qe.observedMetrics.get("my_event").foreach { row =>
+  *         // Trigger if the number of errors exceeds 5 percent
+  *         val num_rows = row.getAs[Long]("rc")
+  *         val num_error_rows = row.getAs[Long]("erc")
+  *         val ratio = num_error_rows.toDouble / num_rows
+  *         if (ratio > 0.05) {
+  *           // Trigger alert
+  *         }
+  *       }
+  *     }
+  *     override def onFailure(funcName: String, qe: QueryExecution, error: 
Throwable): Unit = {
+  *       // No-op
+  *     }
+  *   }
+  *
+  *   // Observe row count (rc) and error row count (erc) in the batch Dataset
   *   val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), 
count($"error").as("erc"))
-  *   observed_ds.writeStream.format("...").start()
+  *   observed_ds.collect()
+  * }}}
   *
-  *   // Monitor the metrics using a listener.
+  * {{{
+  *   // Monitor the metrics for streaming query using a listener.
   *   spark.streams.addListener(new StreamingQueryListener() {
   *     override def onQueryProgress(event: QueryProgressEvent): Unit = {
-  *       event.progress.observedMetrics.get("my_event").foreach { row =>
+  *       event.progress.observedMetrics.asScala.get("my_event").foreach { row 
=>
 
 Review comment:
   This was a minor bug on example: unlike `observedMetrics` available in 
QueryExecutionListener, `event.progress.observedMetrics` is a "Java" Map 
instead of "Scala" Map, hence to use foreach it needs `asScala` to convert to 
Scala Map.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to