[GitHub] [arrow] alamb commented on a change in pull request #7975: ARROW-9758: [Rust] [DataFusion] Allow physical planner to be replaced
alamb commented on a change in pull request #7975: URL: https://github.com/apache/arrow/pull/7975#discussion_r471437170 ## File path: rust/datafusion/src/execution/context.rs ## @@ -373,363 +355,12 @@ impl ExecutionContext { pub fn create_physical_plan( , logical_plan: , -batch_size: usize, ) -> Result> { -match logical_plan { -LogicalPlan::TableScan { -table_name, -projection, -.. -} => match self -.state -.lock() -.expect("failed to lock mutex") -.datasources -.lock() -.expect("failed to lock mutex") -.get(table_name) -{ -Some(provider) => { -let partitions = provider.scan(projection, batch_size)?; -if partitions.is_empty() { -Err(ExecutionError::General( -"Table provider returned no partitions".to_string(), -)) -} else { -let schema = match projection { -None => provider.schema().clone(), -Some(p) => Arc::new(Schema::new( -p.iter() -.map(|i| provider.schema().field(*i).clone()) -.collect(), -)), -}; - -let exec = DatasourceExec::new(schema, partitions.clone()); -Ok(Arc::new(exec)) -} -} -_ => Err(ExecutionError::General(format!( -"No table named {}", -table_name -))), -}, -LogicalPlan::InMemoryScan { -data, -projection, -projected_schema, -.. -} => Ok(Arc::new(MemoryExec::try_new( -data, -Arc::new(projected_schema.as_ref().to_owned()), -projection.to_owned(), -)?)), -LogicalPlan::CsvScan { -path, -schema, -has_header, -delimiter, -projection, -.. -} => Ok(Arc::new(CsvExec::try_new( -path, -CsvReadOptions::new() -.schema(schema.as_ref()) -.delimiter_option(*delimiter) -.has_header(*has_header), -projection.to_owned(), -batch_size, -)?)), -LogicalPlan::ParquetScan { -path, projection, .. -} => Ok(Arc::new(ParquetExec::try_new( -path, -projection.to_owned(), -batch_size, -)?)), -LogicalPlan::Projection { input, expr, .. } => { -let input = self.create_physical_plan(input, batch_size)?; -let input_schema = input.as_ref().schema().clone(); -let runtime_expr = expr -.iter() -.map(|e| { -tuple_err(( -self.create_physical_expr(e, _schema), -e.name(_schema), -)) -}) -.collect::>>()?; -Ok(Arc::new(ProjectionExec::try_new(runtime_expr, input)?)) -} -LogicalPlan::Aggregate { -input, -group_expr, -aggr_expr, -.. -} => { -// Initially need to perform the aggregate and then merge the partitions -let input = self.create_physical_plan(input, batch_size)?; -let input_schema = input.as_ref().schema().clone(); - -let groups = group_expr -.iter() -.map(|e| { -tuple_err(( -self.create_physical_expr(e, _schema), -e.name(_schema), -)) -}) -.collect::>>()?; -let aggregates = aggr_expr -.iter() -.map(|e| { -tuple_err(( -self.create_aggregate_expr(e, _schema), -e.name(_schema), -)) -}) -.collect::>>()?; - -let initial_aggr = HashAggregateExec::try_new( -groups.clone(), -aggregates.clone(), -input, -)?; - -let schema =
[GitHub] [arrow] alamb commented on a change in pull request #7975: ARROW-9758: [Rust] [DataFusion] Allow physical planner to be replaced
alamb commented on a change in pull request #7975: URL: https://github.com/apache/arrow/pull/7975#discussion_r471433074 ## File path: rust/datafusion/src/execution/context.rs ## @@ -1452,11 +1109,34 @@ mod tests { Ok(()) } +#[test] +fn custom_physical_planner() -> Result<()> { Review comment: ## File path: rust/datafusion/src/execution/context.rs ## @@ -373,363 +355,12 @@ impl ExecutionContext { pub fn create_physical_plan( , logical_plan: , -batch_size: usize, ) -> Result> { -match logical_plan { -LogicalPlan::TableScan { -table_name, -projection, -.. -} => match self -.state -.lock() -.expect("failed to lock mutex") -.datasources -.lock() -.expect("failed to lock mutex") -.get(table_name) -{ -Some(provider) => { -let partitions = provider.scan(projection, batch_size)?; -if partitions.is_empty() { -Err(ExecutionError::General( -"Table provider returned no partitions".to_string(), -)) -} else { -let schema = match projection { -None => provider.schema().clone(), -Some(p) => Arc::new(Schema::new( -p.iter() -.map(|i| provider.schema().field(*i).clone()) -.collect(), -)), -}; - -let exec = DatasourceExec::new(schema, partitions.clone()); -Ok(Arc::new(exec)) -} -} -_ => Err(ExecutionError::General(format!( -"No table named {}", -table_name -))), -}, -LogicalPlan::InMemoryScan { -data, -projection, -projected_schema, -.. -} => Ok(Arc::new(MemoryExec::try_new( -data, -Arc::new(projected_schema.as_ref().to_owned()), -projection.to_owned(), -)?)), -LogicalPlan::CsvScan { -path, -schema, -has_header, -delimiter, -projection, -.. -} => Ok(Arc::new(CsvExec::try_new( -path, -CsvReadOptions::new() -.schema(schema.as_ref()) -.delimiter_option(*delimiter) -.has_header(*has_header), -projection.to_owned(), -batch_size, -)?)), -LogicalPlan::ParquetScan { -path, projection, .. -} => Ok(Arc::new(ParquetExec::try_new( -path, -projection.to_owned(), -batch_size, -)?)), -LogicalPlan::Projection { input, expr, .. } => { -let input = self.create_physical_plan(input, batch_size)?; -let input_schema = input.as_ref().schema().clone(); -let runtime_expr = expr -.iter() -.map(|e| { -tuple_err(( -self.create_physical_expr(e, _schema), -e.name(_schema), -)) -}) -.collect::>>()?; -Ok(Arc::new(ProjectionExec::try_new(runtime_expr, input)?)) -} -LogicalPlan::Aggregate { -input, -group_expr, -aggr_expr, -.. -} => { -// Initially need to perform the aggregate and then merge the partitions -let input = self.create_physical_plan(input, batch_size)?; -let input_schema = input.as_ref().schema().clone(); - -let groups = group_expr -.iter() -.map(|e| { -tuple_err(( -self.create_physical_expr(e, _schema), -e.name(_schema), -)) -}) -.collect::>>()?; -let aggregates = aggr_expr -.iter() -.map(|e| { -tuple_err(( -self.create_aggregate_expr(e, _schema), -e.name(_schema), -)) -}) -.collect::>>()?; - -