[GitHub] [arrow] alamb commented on a change in pull request #7975: ARROW-9758: [Rust] [DataFusion] Allow physical planner to be replaced

2020-08-17 Thread GitBox


alamb commented on a change in pull request #7975:
URL: https://github.com/apache/arrow/pull/7975#discussion_r471437170



##
File path: rust/datafusion/src/execution/context.rs
##
@@ -373,363 +355,12 @@ impl ExecutionContext {
 pub fn create_physical_plan(
 ,
 logical_plan: ,
-batch_size: usize,
 ) -> Result> {
-match logical_plan {
-LogicalPlan::TableScan {
-table_name,
-projection,
-..
-} => match self
-.state
-.lock()
-.expect("failed to lock mutex")
-.datasources
-.lock()
-.expect("failed to lock mutex")
-.get(table_name)
-{
-Some(provider) => {
-let partitions = provider.scan(projection, batch_size)?;
-if partitions.is_empty() {
-Err(ExecutionError::General(
-"Table provider returned no 
partitions".to_string(),
-))
-} else {
-let schema = match projection {
-None => provider.schema().clone(),
-Some(p) => Arc::new(Schema::new(
-p.iter()
-.map(|i| 
provider.schema().field(*i).clone())
-.collect(),
-)),
-};
-
-let exec = DatasourceExec::new(schema, 
partitions.clone());
-Ok(Arc::new(exec))
-}
-}
-_ => Err(ExecutionError::General(format!(
-"No table named {}",
-table_name
-))),
-},
-LogicalPlan::InMemoryScan {
-data,
-projection,
-projected_schema,
-..
-} => Ok(Arc::new(MemoryExec::try_new(
-data,
-Arc::new(projected_schema.as_ref().to_owned()),
-projection.to_owned(),
-)?)),
-LogicalPlan::CsvScan {
-path,
-schema,
-has_header,
-delimiter,
-projection,
-..
-} => Ok(Arc::new(CsvExec::try_new(
-path,
-CsvReadOptions::new()
-.schema(schema.as_ref())
-.delimiter_option(*delimiter)
-.has_header(*has_header),
-projection.to_owned(),
-batch_size,
-)?)),
-LogicalPlan::ParquetScan {
-path, projection, ..
-} => Ok(Arc::new(ParquetExec::try_new(
-path,
-projection.to_owned(),
-batch_size,
-)?)),
-LogicalPlan::Projection { input, expr, .. } => {
-let input = self.create_physical_plan(input, batch_size)?;
-let input_schema = input.as_ref().schema().clone();
-let runtime_expr = expr
-.iter()
-.map(|e| {
-tuple_err((
-self.create_physical_expr(e, _schema),
-e.name(_schema),
-))
-})
-.collect::>>()?;
-Ok(Arc::new(ProjectionExec::try_new(runtime_expr, input)?))
-}
-LogicalPlan::Aggregate {
-input,
-group_expr,
-aggr_expr,
-..
-} => {
-// Initially need to perform the aggregate and then merge the 
partitions
-let input = self.create_physical_plan(input, batch_size)?;
-let input_schema = input.as_ref().schema().clone();
-
-let groups = group_expr
-.iter()
-.map(|e| {
-tuple_err((
-self.create_physical_expr(e, _schema),
-e.name(_schema),
-))
-})
-.collect::>>()?;
-let aggregates = aggr_expr
-.iter()
-.map(|e| {
-tuple_err((
-self.create_aggregate_expr(e, _schema),
-e.name(_schema),
-))
-})
-.collect::>>()?;
-
-let initial_aggr = HashAggregateExec::try_new(
-groups.clone(),
-aggregates.clone(),
-input,
-)?;
-
-let schema = 

[GitHub] [arrow] alamb commented on a change in pull request #7975: ARROW-9758: [Rust] [DataFusion] Allow physical planner to be replaced

2020-08-17 Thread GitBox


alamb commented on a change in pull request #7975:
URL: https://github.com/apache/arrow/pull/7975#discussion_r471433074



##
File path: rust/datafusion/src/execution/context.rs
##
@@ -1452,11 +1109,34 @@ mod tests {
 Ok(())
 }
 
+#[test]
+fn custom_physical_planner() -> Result<()> {

Review comment:
    

##
File path: rust/datafusion/src/execution/context.rs
##
@@ -373,363 +355,12 @@ impl ExecutionContext {
 pub fn create_physical_plan(
 ,
 logical_plan: ,
-batch_size: usize,
 ) -> Result> {
-match logical_plan {
-LogicalPlan::TableScan {
-table_name,
-projection,
-..
-} => match self
-.state
-.lock()
-.expect("failed to lock mutex")
-.datasources
-.lock()
-.expect("failed to lock mutex")
-.get(table_name)
-{
-Some(provider) => {
-let partitions = provider.scan(projection, batch_size)?;
-if partitions.is_empty() {
-Err(ExecutionError::General(
-"Table provider returned no 
partitions".to_string(),
-))
-} else {
-let schema = match projection {
-None => provider.schema().clone(),
-Some(p) => Arc::new(Schema::new(
-p.iter()
-.map(|i| 
provider.schema().field(*i).clone())
-.collect(),
-)),
-};
-
-let exec = DatasourceExec::new(schema, 
partitions.clone());
-Ok(Arc::new(exec))
-}
-}
-_ => Err(ExecutionError::General(format!(
-"No table named {}",
-table_name
-))),
-},
-LogicalPlan::InMemoryScan {
-data,
-projection,
-projected_schema,
-..
-} => Ok(Arc::new(MemoryExec::try_new(
-data,
-Arc::new(projected_schema.as_ref().to_owned()),
-projection.to_owned(),
-)?)),
-LogicalPlan::CsvScan {
-path,
-schema,
-has_header,
-delimiter,
-projection,
-..
-} => Ok(Arc::new(CsvExec::try_new(
-path,
-CsvReadOptions::new()
-.schema(schema.as_ref())
-.delimiter_option(*delimiter)
-.has_header(*has_header),
-projection.to_owned(),
-batch_size,
-)?)),
-LogicalPlan::ParquetScan {
-path, projection, ..
-} => Ok(Arc::new(ParquetExec::try_new(
-path,
-projection.to_owned(),
-batch_size,
-)?)),
-LogicalPlan::Projection { input, expr, .. } => {
-let input = self.create_physical_plan(input, batch_size)?;
-let input_schema = input.as_ref().schema().clone();
-let runtime_expr = expr
-.iter()
-.map(|e| {
-tuple_err((
-self.create_physical_expr(e, _schema),
-e.name(_schema),
-))
-})
-.collect::>>()?;
-Ok(Arc::new(ProjectionExec::try_new(runtime_expr, input)?))
-}
-LogicalPlan::Aggregate {
-input,
-group_expr,
-aggr_expr,
-..
-} => {
-// Initially need to perform the aggregate and then merge the 
partitions
-let input = self.create_physical_plan(input, batch_size)?;
-let input_schema = input.as_ref().schema().clone();
-
-let groups = group_expr
-.iter()
-.map(|e| {
-tuple_err((
-self.create_physical_expr(e, _schema),
-e.name(_schema),
-))
-})
-.collect::>>()?;
-let aggregates = aggr_expr
-.iter()
-.map(|e| {
-tuple_err((
-self.create_aggregate_expr(e, _schema),
-e.name(_schema),
-))
-})
-.collect::>>()?;
-
-