srowen commented on a change in pull request #27046: 
[SPARK-29348][SQL][FOLLOWUP] Fix slight bug on streaming example for 
Dataset.observe
URL: https://github.com/apache/spark/pull/27046#discussion_r362011825
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
 ##########
 @@ -1891,14 +1891,34 @@ class Dataset[T] private[sql](
   * [[org.apache.spark.sql.util.QueryExecutionListener]] to the spark session.
   *
   * {{{
-  *   // Observe row count (rc) and error row count (erc) in the streaming 
Dataset
+  *   // Monitor the metrics for batch query using a listener.
+  *   spark.listenerManager.register(new QueryExecutionListener() {
+  *     override def onSuccess(funcName: String, qe: QueryExecution, 
durationNs: Long): Unit = {
+  *       qe.observedMetrics.get("my_event").foreach { row =>
+  *         // Trigger if the number of errors exceeds 5 percent
+  *         val num_rows = row.getAs[Long]("rc")
+  *         val num_error_rows = row.getAs[Long]("erc")
+  *         val ratio = num_error_rows.toDouble / num_rows
+  *         if (ratio > 0.05) {
+  *           // Trigger alert
+  *         }
+  *       }
+  *     }
+  *     override def onFailure(funcName: String, qe: QueryExecution, error: 
Throwable): Unit = {
+  *       // No-op
+  *     }
+  *   }
+  *
+  *   // Observe row count (rc) and error row count (erc) in the batch Dataset
   *   val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), 
count($"error").as("erc"))
-  *   observed_ds.writeStream.format("...").start()
+  *   observed_ds.collect()
+  * }}}
   *
-  *   // Monitor the metrics using a listener.
+  * {{{
+  *   // Monitor the metrics for streaming query using a listener.
   *   spark.streams.addListener(new StreamingQueryListener() {
   *     override def onQueryProgress(event: QueryProgressEvent): Unit = {
-  *       event.progress.observedMetrics.get("my_event").foreach { row =>
+  *       event.progress.observedMetrics.asScala.get("my_event").foreach { row 
=>
 
 Review comment:
   Sounds fine. The alternative is to wrap it in `Option(....get(...)).foreach` 
but that's messier

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to