HeartSaVioR opened a new pull request #27046: [SPARK-29348][SQL][FOLLOWUP] Add 
example of batch query for observable metrics
URL: https://github.com/apache/spark/pull/27046
 
 
   ### What changes were proposed in this pull request?
   
   This patch adds the example of batch query for observable metrics in 
`Dataset.observe`, as well as fixes a small bug in the example of streaming 
query.
   
   ### Why are the changes needed?
   
   The doc of `Dataset.observe` lacks the example for batch query, and it's 
requested by https://github.com/apache/spark/pull/26127#discussion_r361886847
   
   ### Does this PR introduce any user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Ran below two queries via spark-shell:
   
   > batch
   
   ```
   import scala.util.Random
   import org.apache.spark.sql.execution.QueryExecution
   import org.apache.spark.sql.util.QueryExecutionListener
   
   spark.listenerManager.register(new QueryExecutionListener() {
     override def onSuccess(funcName: String, qe: QueryExecution, durationNs: 
Long): Unit = {
       qe.observedMetrics.get("my_event").foreach { row =>
         // Trigger if the number of errors exceeds 5 percent
         val num_rows = row.getAs[Long]("rc")
         val num_error_rows = row.getAs[Long]("erc")
         val ratio = num_error_rows.toDouble / num_rows
         if (ratio > 0.05) {
           // Trigger alert
           println(s"alert! error ratio: $ratio")
         }
       }
     }
   
     override def onFailure(funcName: String, qe: QueryExecution, error: 
Throwable): Unit = {
       // No-op
     }
   })
   
   
   val rand = new Random()
   val df = (0 to 10).map { idx => (idx, if (rand.nextBoolean()) "error" else 
null ) }.toDF
   val ds = df.selectExpr("_1 AS id", "_2 AS error")
   
   // Observe row count (rc) and error row count (erc) in the batch Dataset
   val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), 
count($"error").as("erc"))
   observed_ds.collect()
   ```
   
   > streaming
   
   ```
   import scala.collection.JavaConverters._
   import scala.util.Random
   import org.apache.spark.sql.streaming.StreamingQueryListener
   import org.apache.spark.sql.streaming.StreamingQueryListener._
   
   spark.streams.addListener(new StreamingQueryListener() {
     override def onQueryProgress(event: QueryProgressEvent): Unit = {
       event.progress.observedMetrics.asScala.get("my_event").foreach { row =>
         // Trigger if the number of errors exceeds 5 percent
         val num_rows = row.getAs[Long]("rc")
         val num_error_rows = row.getAs[Long]("erc")
         val ratio = num_error_rows.toDouble / num_rows
         if (ratio > 0.05) {
           // Trigger alert
           println(s"alert! error ratio: $ratio")
         }
       }
     }
   
     def onQueryStarted(event: QueryStartedEvent): Unit = {}
     def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
   })
   
   val rates = spark
     .readStream
     .format("rate")
     .option("rowsPerSecond", 10)
     .load
   
   val rand = new Random()
   val df = rates.map { row => (row.getLong(1), if (row.getLong(1) % 2 == 0) 
"error" else null ) }.toDF
   val ds = df.selectExpr("_1 AS id", "_2 AS error")
   // Observe row count (rc) and error row count (erc) in the batch Dataset
   val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), 
count($"error").as("erc"))
   observed_ds.writeStream.format("console").start()
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to