This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c0d430  Add metrics for FilterExec (#960)
4c0d430 is described below

commit 4c0d4301c1210462500ee1d01bf29e462259753e
Author: Andrew Lamb <and...@nerdnetworks.org>
AuthorDate: Fri Sep 10 11:47:10 2021 -0400

    Add metrics for FilterExec (#960)
---
 datafusion/src/physical_plan/filter.rs | 25 ++++++++++++++++++++++---
 datafusion/tests/sql.rs                | 14 +++++++++++++-
 2 files changed, 35 insertions(+), 4 deletions(-)

diff --git a/datafusion/src/physical_plan/filter.rs 
b/datafusion/src/physical_plan/filter.rs
index 9e7fa9d..52017c6 100644
--- a/datafusion/src/physical_plan/filter.rs
+++ b/datafusion/src/physical_plan/filter.rs
@@ -26,6 +26,7 @@ use std::task::{Context, Poll};
 use super::{RecordBatchStream, SendableRecordBatchStream};
 use crate::error::{DataFusionError, Result};
 use crate::physical_plan::{
+    metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
     DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr,
 };
 use arrow::array::BooleanArray;
@@ -46,6 +47,8 @@ pub struct FilterExec {
     predicate: Arc<dyn PhysicalExpr>,
     /// The input plan
     input: Arc<dyn ExecutionPlan>,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
 }
 
 impl FilterExec {
@@ -58,6 +61,7 @@ impl FilterExec {
             DataType::Boolean => Ok(Self {
                 predicate,
                 input: input.clone(),
+                metrics: ExecutionPlanMetricsSet::new(),
             }),
             other => Err(DataFusionError::Plan(format!(
                 "Filter predicate must return boolean values, not {:?}",
@@ -115,10 +119,13 @@ impl ExecutionPlan for FilterExec {
     }
 
     async fn execute(&self, partition: usize) -> 
Result<SendableRecordBatchStream> {
+        let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
+
         Ok(Box::pin(FilterExecStream {
             schema: self.input.schema().clone(),
             predicate: self.predicate.clone(),
             input: self.input.execute(partition).await?,
+            baseline_metrics,
         }))
     }
 
@@ -133,6 +140,10 @@ impl ExecutionPlan for FilterExec {
             }
         }
     }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
 }
 
 /// The FilterExec streams wraps the input iterator and applies the predicate 
expression to
@@ -144,6 +155,8 @@ struct FilterExecStream {
     predicate: Arc<dyn PhysicalExpr>,
     /// The input partition to filter.
     input: SendableRecordBatchStream,
+    /// runtime metrics recording
+    baseline_metrics: BaselineMetrics,
 }
 
 fn batch_filter(
@@ -176,10 +189,16 @@ impl Stream for FilterExecStream {
         mut self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        self.input.poll_next_unpin(cx).map(|x| match x {
-            Some(Ok(batch)) => Some(batch_filter(&batch, &self.predicate)),
+        let poll = self.input.poll_next_unpin(cx).map(|x| match x {
+            Some(Ok(batch)) => {
+                let timer = self.baseline_metrics.elapsed_compute().timer();
+                let filtered_batch = batch_filter(&batch, &self.predicate);
+                timer.done();
+                Some(filtered_batch)
+            }
             other => other,
-        })
+        });
+        self.baseline_metrics.record_poll(poll)
     }
 
     fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 804ae7e..ccff292 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -2247,7 +2247,14 @@ async fn explain_analyze_baseline_metrics() {
     let mut ctx = ExecutionContext::with_config(config);
     register_aggregate_csv_by_sql(&mut ctx).await;
     // a query with as many operators as we have metrics for
-    let sql = "EXPLAIN ANALYZE select count(*) from (SELECT count(*), c1 FROM 
aggregate_test_100 group by c1 ORDER BY c1)";
+    let sql = "EXPLAIN ANALYZE \
+               select count(*) from \
+               (SELECT count(*), c1 \
+               FROM aggregate_test_100 \
+               WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434' \
+               GROUP BY c1 \
+               ORDER BY c1)";
+    println!("running query: {}", sql);
     let plan = ctx.create_logical_plan(sql).unwrap();
     let plan = ctx.optimize(&plan).unwrap();
     let physical_plan = ctx.create_physical_plan(&plan).unwrap();
@@ -2275,6 +2282,11 @@ async fn explain_analyze_baseline_metrics() {
         "SortExec: [c1@0 ASC]",
         "metrics=[output_rows=5, elapsed_compute="
     );
+    assert_metrics!(
+        &formatted,
+        "FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
+        "metrics=[output_rows=99, elapsed_compute="
+    );
 
     fn expected_to_have_metrics(plan: &dyn ExecutionPlan) -> bool {
         use datafusion::physical_plan::{

Reply via email to