This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push: new 4c0d430 Add metrics for FilterExec (#960) 4c0d430 is described below commit 4c0d4301c1210462500ee1d01bf29e462259753e Author: Andrew Lamb <and...@nerdnetworks.org> AuthorDate: Fri Sep 10 11:47:10 2021 -0400 Add metrics for FilterExec (#960) --- datafusion/src/physical_plan/filter.rs | 25 ++++++++++++++++++++++--- datafusion/tests/sql.rs | 14 +++++++++++++- 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/datafusion/src/physical_plan/filter.rs b/datafusion/src/physical_plan/filter.rs index 9e7fa9d..52017c6 100644 --- a/datafusion/src/physical_plan/filter.rs +++ b/datafusion/src/physical_plan/filter.rs @@ -26,6 +26,7 @@ use std::task::{Context, Poll}; use super::{RecordBatchStream, SendableRecordBatchStream}; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ + metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, }; use arrow::array::BooleanArray; @@ -46,6 +47,8 @@ pub struct FilterExec { predicate: Arc<dyn PhysicalExpr>, /// The input plan input: Arc<dyn ExecutionPlan>, + /// Execution metrics + metrics: ExecutionPlanMetricsSet, } impl FilterExec { @@ -58,6 +61,7 @@ impl FilterExec { DataType::Boolean => Ok(Self { predicate, input: input.clone(), + metrics: ExecutionPlanMetricsSet::new(), }), other => Err(DataFusionError::Plan(format!( "Filter predicate must return boolean values, not {:?}", @@ -115,10 +119,13 @@ impl ExecutionPlan for FilterExec { } async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> { + let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); + Ok(Box::pin(FilterExecStream { schema: self.input.schema().clone(), predicate: self.predicate.clone(), input: self.input.execute(partition).await?, + baseline_metrics, })) } @@ -133,6 +140,10 @@ impl ExecutionPlan for FilterExec { } } } + + fn metrics(&self) -> Option<MetricsSet> { + Some(self.metrics.clone_inner()) + } } /// The FilterExec streams wraps the input iterator and applies the predicate expression to @@ -144,6 +155,8 @@ struct FilterExecStream { predicate: Arc<dyn PhysicalExpr>, /// The input partition to filter. input: SendableRecordBatchStream, + /// runtime metrics recording + baseline_metrics: BaselineMetrics, } fn batch_filter( @@ -176,10 +189,16 @@ impl Stream for FilterExecStream { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - self.input.poll_next_unpin(cx).map(|x| match x { - Some(Ok(batch)) => Some(batch_filter(&batch, &self.predicate)), + let poll = self.input.poll_next_unpin(cx).map(|x| match x { + Some(Ok(batch)) => { + let timer = self.baseline_metrics.elapsed_compute().timer(); + let filtered_batch = batch_filter(&batch, &self.predicate); + timer.done(); + Some(filtered_batch) + } other => other, - }) + }); + self.baseline_metrics.record_poll(poll) } fn size_hint(&self) -> (usize, Option<usize>) { diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 804ae7e..ccff292 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -2247,7 +2247,14 @@ async fn explain_analyze_baseline_metrics() { let mut ctx = ExecutionContext::with_config(config); register_aggregate_csv_by_sql(&mut ctx).await; // a query with as many operators as we have metrics for - let sql = "EXPLAIN ANALYZE select count(*) from (SELECT count(*), c1 FROM aggregate_test_100 group by c1 ORDER BY c1)"; + let sql = "EXPLAIN ANALYZE \ + select count(*) from \ + (SELECT count(*), c1 \ + FROM aggregate_test_100 \ + WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434' \ + GROUP BY c1 \ + ORDER BY c1)"; + println!("running query: {}", sql); let plan = ctx.create_logical_plan(sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); let physical_plan = ctx.create_physical_plan(&plan).unwrap(); @@ -2275,6 +2282,11 @@ async fn explain_analyze_baseline_metrics() { "SortExec: [c1@0 ASC]", "metrics=[output_rows=5, elapsed_compute=" ); + assert_metrics!( + &formatted, + "FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434", + "metrics=[output_rows=99, elapsed_compute=" + ); fn expected_to_have_metrics(plan: &dyn ExecutionPlan) -> bool { use datafusion::physical_plan::{