[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join
andygrove commented on a change in pull request #8709: URL: https://github.com/apache/arrow/pull/8709#discussion_r527750363 ## File path: rust/datafusion/src/physical_plan/hash_utils.rs ## @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Functionality used both on logical and physical plans + +use crate::error::{DataFusionError, Result}; +use arrow::datatypes::{Field, Schema}; +use std::collections::HashSet; + +/// All valid types of joins. +#[derive(Clone, Debug)] +pub enum JoinType { +/// Inner join +Inner, +} + +/// The on clause of the join, as vector of (left, right) columns. +pub type JoinOn<'a> = [(&'a str, &'a str)]; + +/// Checks whether the schemas "left" and "right" and columns "on" represent a valid join. +/// They are valid whenever their columns' intersection equals the set `on` +pub fn check_join_is_valid(left: , right: , on: ) -> Result<()> { +let left: HashSet = left.fields().iter().map(|f| f.name().clone()).collect(); +let right: HashSet = +right.fields().iter().map(|f| f.name().clone()).collect(); + +check_join_set_is_valid(, , on) +} + +/// Checks whether the sets left, right and on compose a valid join. +/// They are valid whenever their intersection equals the set `on` +fn check_join_set_is_valid( +left: , +right: , +on: , +) -> Result<()> { +if on.len() == 0 { +return Err(DataFusionError::Plan( +"The 'on' clause of a join cannot be empty".to_string(), +)); +} +let on_left = ().map(|on| on.0.to_string()).collect::>(); +let left_missing = left.difference(on_left).collect::>(); Review comment: Most of the tests pass with this code: ```rust let on_left = ().map(|on| on.0.to_string()).collect::>(); let left_keys = left .iter() .filter(|f| on_left.contains(*f)) .map(|s| s.clone()) .collect::>(); let left_missing = on_left.difference(_keys).collect::>(); let on_right = ().map(|on| on.1.to_string()).collect::>(); let right_keys = right .iter() .filter(|f| on_right.contains(*f)) .map(|s| s.clone()) .collect::>(); let right_missing = on_right.difference(_keys).collect::>(); if left_missing.len() > 0 || right_missing.len() > 0 { return Err(DataFusionError::Plan(format!( "The left or right side of the join does not have columns \"on\": \nMissing on the left: {:?}\nMissing on the right: {:?}", left_missing, right_missing, ))); }; ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join
andygrove commented on a change in pull request #8709: URL: https://github.com/apache/arrow/pull/8709#discussion_r527740558 ## File path: rust/datafusion/src/physical_plan/hash_utils.rs ## @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Functionality used both on logical and physical plans + +use crate::error::{DataFusionError, Result}; +use arrow::datatypes::{Field, Schema}; +use std::collections::HashSet; + +/// All valid types of joins. +#[derive(Clone, Debug)] +pub enum JoinType { +/// Inner join +Inner, +} + +/// The on clause of the join, as vector of (left, right) columns. +pub type JoinOn<'a> = [(&'a str, &'a str)]; + +/// Checks whether the schemas "left" and "right" and columns "on" represent a valid join. +/// They are valid whenever their columns' intersection equals the set `on` +pub fn check_join_is_valid(left: , right: , on: ) -> Result<()> { +let left: HashSet = left.fields().iter().map(|f| f.name().clone()).collect(); +let right: HashSet = +right.fields().iter().map(|f| f.name().clone()).collect(); + +check_join_set_is_valid(, , on) +} + +/// Checks whether the sets left, right and on compose a valid join. +/// They are valid whenever their intersection equals the set `on` +fn check_join_set_is_valid( +left: , +right: , +on: , +) -> Result<()> { +if on.len() == 0 { +return Err(DataFusionError::Plan( +"The 'on' clause of a join cannot be empty".to_string(), +)); +} +let on_left = ().map(|on| on.0.to_string()).collect::>(); +let left_missing = left.difference(on_left).collect::>(); Review comment: I'm confused here and I think this code might not be doing what you intended. `left_missing` contains all the left fields except for the fields references in the `on` clause, so this function will always fail unless the both tables only have join keys and no other columns. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join
andygrove commented on a change in pull request #8709: URL: https://github.com/apache/arrow/pull/8709#discussion_r527735046 ## File path: rust/datafusion/src/physical_plan/hash_utils.rs ## @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Functionality used both on logical and physical plans + +use crate::error::{DataFusionError, Result}; +use arrow::datatypes::{Field, Schema}; +use std::collections::HashSet; + +/// All valid types of joins. +#[derive(Clone, Debug)] +pub enum JoinType { +/// Inner join +Inner, +} + +/// The on clause of the join, as vector of (left, right) columns. +pub type JoinOn<'a> = [(&'a str, &'a str)]; + +/// Checks whether the schemas "left" and "right" and columns "on" represent a valid join. +/// They are valid whenever their columns' intersection equals the set `on` +pub fn check_join_is_valid(left: , right: , on: ) -> Result<()> { +let left: HashSet = left.fields().iter().map(|f| f.name().clone()).collect(); +let right: HashSet = +right.fields().iter().map(|f| f.name().clone()).collect(); + +check_join_set_is_valid(, , on) +} + +/// Checks whether the sets left, right and on compose a valid join. +/// They are valid whenever their intersection equals the set `on` +fn check_join_set_is_valid( +left: , +right: , +on: , +) -> Result<()> { +if on.len() == 0 { +return Err(DataFusionError::Plan( +"The 'on' clause of a join cannot be empty".to_string(), +)); +} +let on_left = ().map(|on| on.0.to_string()).collect::>(); +let left_missing = left.difference(on_left).collect::>(); + +let on_right = ().map(|on| on.1.to_string()).collect::>(); +let right_missing = right.difference(on_right).collect::>(); + +if (left_missing.len() > 0) | (right_missing.len() > 0) { +return Err(DataFusionError::Plan(format!( +"The left or right side of the join does not have columns \"on\": \nMissing on the left: {:?}\nMissing on the right: {:?}", +left_missing, +right_missing, +))); +}; +Ok(()) +} + +/// Creates a schema for a join operation. +/// The fields from the left side are first +pub fn build_join_schema( +left: , +right: , +on: , +join_type: , +) -> Schema { +let fields: Vec = match join_type { +JoinType::Inner => { +let on_left = ().map(|on| on.0.to_string()).collect::>(); +let on_right = ().map(|on| on.1.to_string()).collect::>(); + +// inner: all fields are there +let left_fields = +left.fields().iter().filter(|f| !on_left.contains(f.name())); + +let right_fields = right +.fields() +.iter() +.filter(|f| !on_right.contains(f.name())); + +// left then right +left_fields.chain(right_fields).map(|f| f.clone()).collect() +} +}; +Schema::new(fields) Review comment: @jorgecarleitao I think the join keys are being dropped from the schema? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join
andygrove commented on a change in pull request #8709: URL: https://github.com/apache/arrow/pull/8709#discussion_r527067914 ## File path: rust/datafusion/src/physical_plan/hash_utils.rs ## @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Functionality used both on logical and physical plans + +use crate::error::{DataFusionError, Result}; +use arrow::datatypes::{Field, Schema}; +use std::collections::HashSet; + +/// All valid types of joins. +#[derive(Clone, Debug)] +pub enum JoinType { +/// Inner join +Inner, +} + +/// Checks whether the schemas "left" and "right" and columns "on" represent a valid join. +/// They are valid whenever their columns' intersection equals the set `on` +pub fn check_join_is_valid( +left: , +right: , +on: , +) -> Result<()> { +let left: HashSet = left.fields().iter().map(|f| f.name().clone()).collect(); +let right: HashSet = +right.fields().iter().map(|f| f.name().clone()).collect(); + +check_join_set_is_valid(, , ) +} + +/// Checks whether the sets left, right and on compose a valid join. +/// They are valid whenever their intersection equals the set `on` +fn check_join_set_is_valid( +left: , +right: , +on: , +) -> Result<()> { +if on.len() == 0 { +return Err(DataFusionError::Plan( +"The 'on' clause of a join cannot be empty".to_string(), +)); +} + +let on_columns = on.iter().collect::>(); +let common_columns = left.intersection().collect::>(); +let missing = on_columns +.difference(_columns) +.collect::>(); +if missing.len() > 0 { +return Err(DataFusionError::Plan(format!( +"The left or right side of the join does not have columns {:?} columns on \"on\": \nLeft: {:?}\nRight: {:?}\nOn: {:?}", +missing, +left, +right, +on, +).to_string())); +}; +Ok(()) +} + +/// Creates a schema for a join operation. +/// The fields "on" from the left side are always first +pub fn build_join_schema( +left: , +right: , +on: , +join_type: , +) -> Schema { +let fields: Vec = match join_type { +JoinType::Inner => { +// inner: all fields are there + +let on_fields = left.fields().iter().filter(|f| on.contains(f.name())); + +let left_fields = left.fields().iter().filter(|f| !on.contains(f.name())); + +let right_fields = right.fields().iter().filter(|f| !on.contains(f.name())); + +// "on" are first by construction, then left, then right +on_fields +.chain(left_fields) +.chain(right_fields) +.map(|f| f.clone()) +.collect() Review comment: I think you're right. This is simpler and forces the user to add aliases as required to avoid conflicts. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join
andygrove commented on a change in pull request #8709: URL: https://github.com/apache/arrow/pull/8709#discussion_r527061979 ## File path: rust/datafusion/src/physical_plan/hash_join.rs ## @@ -0,0 +1,467 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines the join plan for executing partitions in parallel and then joining the results +//! into a set of partitions. + +use std::sync::Arc; +use std::{ +any::Any, +collections::{HashMap, HashSet}, +}; + +use async_trait::async_trait; +use futures::{Stream, StreamExt, TryStreamExt}; + +use arrow::array::{make_array, Array, MutableArrayData}; +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; + +use super::hash_utils::{build_join_schema, check_join_is_valid, JoinHow}; +use super::{expressions::col, hash_aggregate::create_key}; +use crate::error::{DataFusionError, Result}; + +use super::{ +group_scalar::GroupByScalar, ExecutionPlan, Partitioning, RecordBatchStream, +SendableRecordBatchStream, +}; + +// An index of (batch, row) uniquely identifying a row in a part. +type Index = (usize, usize); +// None represents a null (e.g. in case of a left join, some right indices are null) +type JoinIndex = (usize, usize); +// A mapping "on" value -> list of row indexes with this key's value +// E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true +// for rows 3 and 8 from batch 0 and row 6 from batch 1. +type JoinHashMap = HashMap, Vec>; +type JoinLeftData = (JoinHashMap, Vec); + +/// join execution plan executes partitions in parallel and combines them into a set of +/// partitions. +#[derive(Debug)] +pub struct HashJoinExec { +/// left side +left: Arc, +/// right side +right: Arc, +/// Set of common columns used to join on +on: HashSet, Review comment: Yes, this is an implicit projection that happens on the join expressions. This is usually not represented as a separate step in a query plan, in my experience at least, and is handled within the join operator itself. I agree that this would be equivalent in compute to having a separate projection in the plan. I would be fine if we just want to support join on columns for this PR and look at supporting any expression as a separate issue. We can always work around the limitation by doing explicit projections before the join. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join
andygrove commented on a change in pull request #8709: URL: https://github.com/apache/arrow/pull/8709#discussion_r527028885 ## File path: rust/datafusion/src/physical_plan/hash_utils.rs ## @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Functionality used both on logical and physical plans + +use crate::error::{DataFusionError, Result}; +use arrow::datatypes::{Field, Schema}; +use std::collections::HashSet; + +/// All valid types of joins. +#[derive(Clone, Debug)] +pub enum JoinType { +/// Inner join +Inner, +} + +/// Checks whether the schemas "left" and "right" and columns "on" represent a valid join. +/// They are valid whenever their columns' intersection equals the set `on` +pub fn check_join_is_valid( +left: , +right: , +on: , +) -> Result<()> { +let left: HashSet = left.fields().iter().map(|f| f.name().clone()).collect(); +let right: HashSet = +right.fields().iter().map(|f| f.name().clone()).collect(); + +check_join_set_is_valid(, , ) +} + +/// Checks whether the sets left, right and on compose a valid join. +/// They are valid whenever their intersection equals the set `on` +fn check_join_set_is_valid( +left: , +right: , +on: , +) -> Result<()> { +if on.len() == 0 { +return Err(DataFusionError::Plan( +"The 'on' clause of a join cannot be empty".to_string(), +)); +} + +let on_columns = on.iter().collect::>(); +let common_columns = left.intersection().collect::>(); +let missing = on_columns +.difference(_columns) +.collect::>(); +if missing.len() > 0 { +return Err(DataFusionError::Plan(format!( +"The left or right side of the join does not have columns {:?} columns on \"on\": \nLeft: {:?}\nRight: {:?}\nOn: {:?}", +missing, +left, +right, +on, +).to_string())); +}; +Ok(()) +} + +/// Creates a schema for a join operation. +/// The fields "on" from the left side are always first +pub fn build_join_schema( +left: , +right: , +on: , +join_type: , +) -> Schema { +let fields: Vec = match join_type { +JoinType::Inner => { +// inner: all fields are there + +let on_fields = left.fields().iter().filter(|f| on.contains(f.name())); + +let left_fields = left.fields().iter().filter(|f| !on.contains(f.name())); + +let right_fields = right.fields().iter().filter(|f| !on.contains(f.name())); + +// "on" are first by construction, then left, then right +on_fields +.chain(left_fields) +.chain(right_fields) +.map(|f| f.clone()) +.collect() Review comment: @jorgecarleitao Just wanted to make sure you saw this :point_up: This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join
andygrove commented on a change in pull request #8709: URL: https://github.com/apache/arrow/pull/8709#discussion_r527020685 ## File path: rust/datafusion/src/physical_plan/hash_utils.rs ## @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Functionality used both on logical and physical plans + +use crate::error::{DataFusionError, Result}; +use arrow::datatypes::{Field, Schema}; +use std::collections::HashSet; + +/// All valid types of joins. +#[derive(Clone, Debug)] +pub enum JoinType { +/// Inner join +Inner, +} + +/// Checks whether the schemas "left" and "right" and columns "on" represent a valid join. +/// They are valid whenever their columns' intersection equals the set `on` +pub fn check_join_is_valid( +left: , +right: , +on: , +) -> Result<()> { +let left: HashSet = left.fields().iter().map(|f| f.name().clone()).collect(); +let right: HashSet = +right.fields().iter().map(|f| f.name().clone()).collect(); + +check_join_set_is_valid(, , ) +} + +/// Checks whether the sets left, right and on compose a valid join. +/// They are valid whenever their intersection equals the set `on` +fn check_join_set_is_valid( +left: , +right: , +on: , +) -> Result<()> { +if on.len() == 0 { +return Err(DataFusionError::Plan( +"The 'on' clause of a join cannot be empty".to_string(), +)); +} + +let on_columns = on.iter().collect::>(); +let common_columns = left.intersection().collect::>(); +let missing = on_columns +.difference(_columns) +.collect::>(); +if missing.len() > 0 { +return Err(DataFusionError::Plan(format!( +"The left or right side of the join does not have columns {:?} columns on \"on\": \nLeft: {:?}\nRight: {:?}\nOn: {:?}", +missing, +left, +right, +on, +).to_string())); +}; +Ok(()) +} + +/// Creates a schema for a join operation. +/// The fields "on" from the left side are always first +pub fn build_join_schema( +left: , +right: , +on: , +join_type: , +) -> Schema { +let fields: Vec = match join_type { +JoinType::Inner => { +// inner: all fields are there + +let on_fields = left.fields().iter().filter(|f| on.contains(f.name())); + +let left_fields = left.fields().iter().filter(|f| !on.contains(f.name())); + +let right_fields = right.fields().iter().filter(|f| !on.contains(f.name())); + +// "on" are first by construction, then left, then right +on_fields +.chain(left_fields) +.chain(right_fields) +.map(|f| f.clone()) +.collect() Review comment: The output schema of the join would simply have fields named "a.id", "a.name", "b.id", "b.name" based on the previous example. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join
andygrove commented on a change in pull request #8709: URL: https://github.com/apache/arrow/pull/8709#discussion_r527019693 ## File path: rust/datafusion/src/physical_plan/hash_utils.rs ## @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Functionality used both on logical and physical plans + +use crate::error::{DataFusionError, Result}; +use arrow::datatypes::{Field, Schema}; +use std::collections::HashSet; + +/// All valid types of joins. +#[derive(Clone, Debug)] +pub enum JoinType { +/// Inner join +Inner, +} + +/// Checks whether the schemas "left" and "right" and columns "on" represent a valid join. +/// They are valid whenever their columns' intersection equals the set `on` +pub fn check_join_is_valid( +left: , +right: , +on: , +) -> Result<()> { +let left: HashSet = left.fields().iter().map(|f| f.name().clone()).collect(); +let right: HashSet = +right.fields().iter().map(|f| f.name().clone()).collect(); + +check_join_set_is_valid(, , ) +} + +/// Checks whether the sets left, right and on compose a valid join. +/// They are valid whenever their intersection equals the set `on` +fn check_join_set_is_valid( +left: , +right: , +on: , +) -> Result<()> { +if on.len() == 0 { +return Err(DataFusionError::Plan( +"The 'on' clause of a join cannot be empty".to_string(), +)); +} + +let on_columns = on.iter().collect::>(); +let common_columns = left.intersection().collect::>(); +let missing = on_columns +.difference(_columns) +.collect::>(); +if missing.len() > 0 { +return Err(DataFusionError::Plan(format!( +"The left or right side of the join does not have columns {:?} columns on \"on\": \nLeft: {:?}\nRight: {:?}\nOn: {:?}", +missing, +left, +right, +on, +).to_string())); +}; +Ok(()) +} + +/// Creates a schema for a join operation. +/// The fields "on" from the left side are always first +pub fn build_join_schema( +left: , +right: , +on: , +join_type: , +) -> Schema { +let fields: Vec = match join_type { +JoinType::Inner => { +// inner: all fields are there + +let on_fields = left.fields().iter().filter(|f| on.contains(f.name())); + +let left_fields = left.fields().iter().filter(|f| !on.contains(f.name())); + +let right_fields = right.fields().iter().filter(|f| !on.contains(f.name())); + +// "on" are first by construction, then left, then right +on_fields +.chain(left_fields) +.chain(right_fields) +.map(|f| f.clone()) +.collect() Review comment: I don't understand why we would want to do that. Each operator has a single output schema. The join operator creates a new schema derived from the input schemas. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join
andygrove commented on a change in pull request #8709: URL: https://github.com/apache/arrow/pull/8709#discussion_r526856756 ## File path: rust/datafusion/src/physical_plan/mod.rs ## @@ -89,6 +89,8 @@ pub trait ExecutionPlan: Debug + Send + Sync { pub enum Partitioning { /// Unknown partitioning scheme UnknownPartitioning(usize), +/// parts hashed by columns and `N` partitions +HashPartitioning(Vec, usize), Review comment: It makes sense for us to add a `HashPartitioning` variant here, but this isn't required for implementing the hash join operator since we aren't changing partitioning. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join
andygrove commented on a change in pull request #8709: URL: https://github.com/apache/arrow/pull/8709#discussion_r526855265 ## File path: rust/datafusion/src/physical_plan/hash_join.rs ## @@ -0,0 +1,507 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines the join plan for executing partitions in parallel and then joining the results +//! into a set of partitions. + +use std::sync::Arc; +use std::{ +any::Any, +collections::{HashMap, HashSet}, +}; + +use async_trait::async_trait; +use futures::{Stream, StreamExt, TryStreamExt}; + +use arrow::array::{make_array, Array, MutableArrayData}; +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; + +use super::{expressions::col, hash_aggregate::create_key}; +use super::{ +hash_utils::{build_join_schema, check_join_is_valid, JoinType}, +merge::MergeExec, +}; +use crate::error::{DataFusionError, Result}; + +use super::{ +group_scalar::GroupByScalar, ExecutionPlan, Partitioning, RecordBatchStream, +SendableRecordBatchStream, +}; + +// An index of (batch, row) uniquely identifying a row in a part. +type Index = (usize, usize); +// A pair (left index, right index) +// Note that while this is currently equal to `Index`, the `JoinIndex` is semantically different +// as a left join may issue None indices, in which case +type JoinIndex = Option<(usize, usize)>; +// Maps ["on" value] -> [list of indices with this key's value] +// E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true +// for rows 3 and 8 from batch 0 and row 6 from batch 1. +type JoinHashMap = HashMap, Vec>; +type JoinLeftData = (JoinHashMap, Vec); + +/// join execution plan executes partitions in parallel and combines them into a set of +/// partitions. +#[derive(Debug)] +pub struct HashJoinExec { +/// left side +left: Arc, +/// right side +right: Arc, +/// Set of common columns used to join on +on: HashSet, +/// How the join is performed +join_type: JoinType, +/// The schema once the join is applied +schema: SchemaRef, +} + +impl HashJoinExec { +/// Tries to create a new [HashJoinExec]. +/// # Error +/// This function errors when it is not possible to join the left and right sides on keys `on`. +pub fn try_new( +left: Arc, +right: Arc, +on: , +join_type: , +) -> Result { +let left_schema = left.schema(); +let right_schema = right.schema(); +check_join_is_valid(_schema, _schema, )?; + +let on = on.iter().map(|s| s.clone()).collect::>(); + +let schema = Arc::new(build_join_schema( +_schema, +_schema, +, +_type, +)); + +Ok(HashJoinExec { +left, +right, +on: on.clone(), +join_type: join_type.clone(), +schema, +}) +} +} + +#[async_trait] +impl ExecutionPlan for HashJoinExec { +fn as_any() -> Any { +self +} + +fn schema() -> SchemaRef { +self.schema.clone() +} + +fn children() -> Vec> { +vec![self.left.clone(), self.right.clone()] +} + +fn with_new_children( +, +children: Vec>, +) -> Result> { +match children.len() { +2 => Ok(Arc::new(HashJoinExec::try_new( +children[0].clone(), +children[1].clone(), +, +_type, +)?)), +_ => Err(DataFusionError::Internal( +"HashJoinExec wrong number of children".to_string(), +)), +} +} + +fn output_partitioning() -> Partitioning { +self.right.output_partitioning() Review comment: This is correct. The output partitioning is the same as the partitioning of the right-hand input. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact
[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join
andygrove commented on a change in pull request #8709: URL: https://github.com/apache/arrow/pull/8709#discussion_r526851354 ## File path: rust/datafusion/src/physical_plan/hash_utils.rs ## @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Functionality used both on logical and physical plans + +use crate::error::{DataFusionError, Result}; +use arrow::datatypes::{Field, Schema}; +use std::collections::HashSet; + +/// All valid types of joins. +#[derive(Clone, Debug)] +pub enum JoinType { +/// Inner join +Inner, +} + +/// Checks whether the schemas "left" and "right" and columns "on" represent a valid join. +/// They are valid whenever their columns' intersection equals the set `on` +pub fn check_join_is_valid( +left: , +right: , +on: , +) -> Result<()> { +let left: HashSet = left.fields().iter().map(|f| f.name().clone()).collect(); +let right: HashSet = +right.fields().iter().map(|f| f.name().clone()).collect(); + +check_join_set_is_valid(, , ) +} + +/// Checks whether the sets left, right and on compose a valid join. +/// They are valid whenever their intersection equals the set `on` +fn check_join_set_is_valid( +left: , +right: , +on: , +) -> Result<()> { +if on.len() == 0 { +return Err(DataFusionError::Plan( +"The 'on' clause of a join cannot be empty".to_string(), +)); +} + +let on_columns = on.iter().collect::>(); +let common_columns = left.intersection().collect::>(); +let missing = on_columns +.difference(_columns) +.collect::>(); +if missing.len() > 0 { +return Err(DataFusionError::Plan(format!( +"The left or right side of the join does not have columns {:?} columns on \"on\": \nLeft: {:?}\nRight: {:?}\nOn: {:?}", +missing, +left, +right, +on, +).to_string())); +}; +Ok(()) +} + +/// Creates a schema for a join operation. +/// The fields "on" from the left side are always first +pub fn build_join_schema( +left: , +right: , +on: , +join_type: , +) -> Schema { +let fields: Vec = match join_type { +JoinType::Inner => { +// inner: all fields are there + +let on_fields = left.fields().iter().filter(|f| on.contains(f.name())); + +let left_fields = left.fields().iter().filter(|f| !on.contains(f.name())); + +let right_fields = right.fields().iter().filter(|f| !on.contains(f.name())); + +// "on" are first by construction, then left, then right +on_fields +.chain(left_fields) +.chain(right_fields) +.map(|f| f.clone()) +.collect() Review comment: It would be more standard to have all the left fields followed by all the right fields. Also, the new schema should have fully qualified names that include the table/alias prefix. For example if we join table 'a' with fields `id` and `name` with table 'b' that also has fields `id` and `name`, the output schema would be `a.id`, `a.name`, `b.id`, `b.name`. It is quite possible that we are lacking support currently for being able to reference these qualified field names, so we will need to deal with that before we can fully integrate this new join operator into DataFusion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join
andygrove commented on a change in pull request #8709: URL: https://github.com/apache/arrow/pull/8709#discussion_r526851354 ## File path: rust/datafusion/src/physical_plan/hash_utils.rs ## @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Functionality used both on logical and physical plans + +use crate::error::{DataFusionError, Result}; +use arrow::datatypes::{Field, Schema}; +use std::collections::HashSet; + +/// All valid types of joins. +#[derive(Clone, Debug)] +pub enum JoinType { +/// Inner join +Inner, +} + +/// Checks whether the schemas "left" and "right" and columns "on" represent a valid join. +/// They are valid whenever their columns' intersection equals the set `on` +pub fn check_join_is_valid( +left: , +right: , +on: , +) -> Result<()> { +let left: HashSet = left.fields().iter().map(|f| f.name().clone()).collect(); +let right: HashSet = +right.fields().iter().map(|f| f.name().clone()).collect(); + +check_join_set_is_valid(, , ) +} + +/// Checks whether the sets left, right and on compose a valid join. +/// They are valid whenever their intersection equals the set `on` +fn check_join_set_is_valid( +left: , +right: , +on: , +) -> Result<()> { +if on.len() == 0 { +return Err(DataFusionError::Plan( +"The 'on' clause of a join cannot be empty".to_string(), +)); +} + +let on_columns = on.iter().collect::>(); +let common_columns = left.intersection().collect::>(); +let missing = on_columns +.difference(_columns) +.collect::>(); +if missing.len() > 0 { +return Err(DataFusionError::Plan(format!( +"The left or right side of the join does not have columns {:?} columns on \"on\": \nLeft: {:?}\nRight: {:?}\nOn: {:?}", +missing, +left, +right, +on, +).to_string())); +}; +Ok(()) +} + +/// Creates a schema for a join operation. +/// The fields "on" from the left side are always first +pub fn build_join_schema( +left: , +right: , +on: , +join_type: , +) -> Schema { +let fields: Vec = match join_type { +JoinType::Inner => { +// inner: all fields are there + +let on_fields = left.fields().iter().filter(|f| on.contains(f.name())); + +let left_fields = left.fields().iter().filter(|f| !on.contains(f.name())); + +let right_fields = right.fields().iter().filter(|f| !on.contains(f.name())); + +// "on" are first by construction, then left, then right +on_fields +.chain(left_fields) +.chain(right_fields) +.map(|f| f.clone()) +.collect() Review comment: It would be more standard to have all the left fields followed by all the right fields. Also, the new schema should have fully qualified names that include the table/alias prefix. For example if we join table 'a' with fields `id` and name` with table 'b' that also has fields `id` and `name`, the output schema would be `a.id`, `a.name`, `b.id`, `b.name`. It is quite possible that we are lacking support currently for being able to reference these qualified field names, so we will need to deal with that before we can fully integrate this new join operator into DataFusion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join
andygrove commented on a change in pull request #8709: URL: https://github.com/apache/arrow/pull/8709#discussion_r526849347 ## File path: rust/datafusion/src/physical_plan/hash_join.rs ## @@ -0,0 +1,467 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines the join plan for executing partitions in parallel and then joining the results +//! into a set of partitions. + +use std::sync::Arc; +use std::{ +any::Any, +collections::{HashMap, HashSet}, +}; + +use async_trait::async_trait; +use futures::{Stream, StreamExt, TryStreamExt}; + +use arrow::array::{make_array, Array, MutableArrayData}; +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; + +use super::hash_utils::{build_join_schema, check_join_is_valid, JoinHow}; +use super::{expressions::col, hash_aggregate::create_key}; +use crate::error::{DataFusionError, Result}; + +use super::{ +group_scalar::GroupByScalar, ExecutionPlan, Partitioning, RecordBatchStream, +SendableRecordBatchStream, +}; + +// An index of (batch, row) uniquely identifying a row in a part. +type Index = (usize, usize); +// None represents a null (e.g. in case of a left join, some right indices are null) +type JoinIndex = (usize, usize); +// A mapping "on" value -> list of row indexes with this key's value +// E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true +// for rows 3 and 8 from batch 0 and row 6 from batch 1. +type JoinHashMap = HashMap, Vec>; +type JoinLeftData = (JoinHashMap, Vec); + +/// join execution plan executes partitions in parallel and combines them into a set of +/// partitions. +#[derive(Debug)] +pub struct HashJoinExec { +/// left side +left: Arc, +/// right side +right: Arc, +/// Set of common columns used to join on +on: HashSet, Review comment: After thinking about this some more, I think it might be better to address this in the current PR. I would suggest either: ```rust left_keys: Vec right_keys: Vec, ``` or the more general case: ```rust left_keys: Vec> right_keys: Vec>, ``` It should be possible to perform an equi-join using any deterministic expression, for example: ``` SELECT a.*, b.* FROM a JOIN b on UPPER(a.name) = UPPER(CONCAT(b.first, ' ', b.last)) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join
andygrove commented on a change in pull request #8709: URL: https://github.com/apache/arrow/pull/8709#discussion_r526837738 ## File path: rust/datafusion/src/physical_plan/hash_join.rs ## @@ -0,0 +1,507 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines the join plan for executing partitions in parallel and then joining the results +//! into a set of partitions. + +use std::sync::Arc; +use std::{ +any::Any, +collections::{HashMap, HashSet}, +}; + +use async_trait::async_trait; +use futures::{Stream, StreamExt, TryStreamExt}; + +use arrow::array::{make_array, Array, MutableArrayData}; +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; + +use super::{expressions::col, hash_aggregate::create_key}; +use super::{ +hash_utils::{build_join_schema, check_join_is_valid, JoinType}, +merge::MergeExec, +}; +use crate::error::{DataFusionError, Result}; + +use super::{ +group_scalar::GroupByScalar, ExecutionPlan, Partitioning, RecordBatchStream, +SendableRecordBatchStream, +}; + +// An index of (batch, row) uniquely identifying a row in a part. +type Index = (usize, usize); +// A pair (left index, right index) +// Note that while this is currently equal to `Index`, the `JoinIndex` is semantically different +// as a left join may issue None indices, in which case +type JoinIndex = Option<(usize, usize)>; +// Maps ["on" value] -> [list of indices with this key's value] +// E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true +// for rows 3 and 8 from batch 0 and row 6 from batch 1. +type JoinHashMap = HashMap, Vec>; +type JoinLeftData = (JoinHashMap, Vec); + +/// join execution plan executes partitions in parallel and combines them into a set of +/// partitions. +#[derive(Debug)] +pub struct HashJoinExec { +/// left side +left: Arc, +/// right side +right: Arc, +/// Set of common columns used to join on +on: HashSet, +/// How the join is performed +join_type: JoinType, +/// The schema once the join is applied +schema: SchemaRef, +} + +impl HashJoinExec { +/// Tries to create a new [HashJoinExec]. +/// # Error +/// This function errors when it is not possible to join the left and right sides on keys `on`. +pub fn try_new( +left: Arc, +right: Arc, +on: , +join_type: , +) -> Result { +let left_schema = left.schema(); +let right_schema = right.schema(); +check_join_is_valid(_schema, _schema, )?; + +let on = on.iter().map(|s| s.clone()).collect::>(); + +let schema = Arc::new(build_join_schema( +_schema, +_schema, +, +_type, +)); + +Ok(HashJoinExec { +left, +right, +on: on.clone(), +join_type: join_type.clone(), +schema, +}) +} +} + +#[async_trait] +impl ExecutionPlan for HashJoinExec { +fn as_any() -> Any { +self +} + +fn schema() -> SchemaRef { +self.schema.clone() +} + +fn children() -> Vec> { +vec![self.left.clone(), self.right.clone()] +} + +fn with_new_children( +, +children: Vec>, +) -> Result> { +match children.len() { +2 => Ok(Arc::new(HashJoinExec::try_new( +children[0].clone(), +children[1].clone(), +, +_type, +)?)), +_ => Err(DataFusionError::Internal( +"HashJoinExec wrong number of children".to_string(), +)), +} +} + +fn output_partitioning() -> Partitioning { +self.right.output_partitioning() +} + +async fn execute(, partition: usize) -> Result { +// merge all parts into a single stream +// this is currently expensive as we re-compute this for every part from the right +// TODO: Fix this issue: we can't share this state across parts on the right. Review comment: I woulld be fine with us doing this as a follow up and get the expensive version of this
[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join
andygrove commented on a change in pull request #8709: URL: https://github.com/apache/arrow/pull/8709#discussion_r526834131 ## File path: rust/datafusion/src/physical_plan/hash_join.rs ## @@ -0,0 +1,507 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines the join plan for executing partitions in parallel and then joining the results +//! into a set of partitions. + +use std::sync::Arc; +use std::{ +any::Any, +collections::{HashMap, HashSet}, +}; + +use async_trait::async_trait; +use futures::{Stream, StreamExt, TryStreamExt}; + +use arrow::array::{make_array, Array, MutableArrayData}; +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; + +use super::{expressions::col, hash_aggregate::create_key}; +use super::{ +hash_utils::{build_join_schema, check_join_is_valid, JoinType}, +merge::MergeExec, +}; +use crate::error::{DataFusionError, Result}; + +use super::{ +group_scalar::GroupByScalar, ExecutionPlan, Partitioning, RecordBatchStream, +SendableRecordBatchStream, +}; + +// An index of (batch, row) uniquely identifying a row in a part. +type Index = (usize, usize); +// A pair (left index, right index) +// Note that while this is currently equal to `Index`, the `JoinIndex` is semantically different +// as a left join may issue None indices, in which case +type JoinIndex = Option<(usize, usize)>; +// Maps ["on" value] -> [list of indices with this key's value] +// E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true +// for rows 3 and 8 from batch 0 and row 6 from batch 1. +type JoinHashMap = HashMap, Vec>; +type JoinLeftData = (JoinHashMap, Vec); + +/// join execution plan executes partitions in parallel and combines them into a set of +/// partitions. +#[derive(Debug)] +pub struct HashJoinExec { +/// left side +left: Arc, +/// right side +right: Arc, +/// Set of common columns used to join on +on: HashSet, +/// How the join is performed +join_type: JoinType, +/// The schema once the join is applied +schema: SchemaRef, +} + +impl HashJoinExec { +/// Tries to create a new [HashJoinExec]. +/// # Error +/// This function errors when it is not possible to join the left and right sides on keys `on`. +pub fn try_new( +left: Arc, +right: Arc, +on: , +join_type: , +) -> Result { +let left_schema = left.schema(); +let right_schema = right.schema(); +check_join_is_valid(_schema, _schema, )?; + +let on = on.iter().map(|s| s.clone()).collect::>(); + +let schema = Arc::new(build_join_schema( +_schema, +_schema, +, +_type, +)); + +Ok(HashJoinExec { +left, +right, +on: on.clone(), +join_type: join_type.clone(), +schema, +}) +} +} + +#[async_trait] +impl ExecutionPlan for HashJoinExec { +fn as_any() -> Any { +self +} + +fn schema() -> SchemaRef { +self.schema.clone() +} + +fn children() -> Vec> { +vec![self.left.clone(), self.right.clone()] +} + +fn with_new_children( +, +children: Vec>, +) -> Result> { +match children.len() { +2 => Ok(Arc::new(HashJoinExec::try_new( +children[0].clone(), +children[1].clone(), +, +_type, +)?)), +_ => Err(DataFusionError::Internal( +"HashJoinExec wrong number of children".to_string(), +)), +} +} + +fn output_partitioning() -> Partitioning { +self.right.output_partitioning() +} + +async fn execute(, partition: usize) -> Result { +// merge all parts into a single stream +// this is currently expensive as we re-compute this for every part from the right +// TODO: Fix this issue: we can't share this state across parts on the right. Review comment: When I tried to implement this, my approach was to store the materialized left-hand side
[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join
andygrove commented on a change in pull request #8709: URL: https://github.com/apache/arrow/pull/8709#discussion_r526467098 ## File path: rust/datafusion/src/physical_plan/hash_join.rs ## @@ -0,0 +1,467 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines the join plan for executing partitions in parallel and then joining the results +//! into a set of partitions. + +use std::sync::Arc; +use std::{ +any::Any, +collections::{HashMap, HashSet}, +}; + +use async_trait::async_trait; +use futures::{Stream, StreamExt, TryStreamExt}; + +use arrow::array::{make_array, Array, MutableArrayData}; +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; + +use super::hash_utils::{build_join_schema, check_join_is_valid, JoinHow}; +use super::{expressions::col, hash_aggregate::create_key}; +use crate::error::{DataFusionError, Result}; + +use super::{ +group_scalar::GroupByScalar, ExecutionPlan, Partitioning, RecordBatchStream, +SendableRecordBatchStream, +}; + +// An index of (batch, row) uniquely identifying a row in a part. +type Index = (usize, usize); +// None represents a null (e.g. in case of a left join, some right indices are null) +type JoinIndex = (usize, usize); +// A mapping "on" value -> list of row indexes with this key's value +// E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true +// for rows 3 and 8 from batch 0 and row 6 from batch 1. +type JoinHashMap = HashMap, Vec>; +type JoinLeftData = (JoinHashMap, Vec); + +/// join execution plan executes partitions in parallel and combines them into a set of +/// partitions. +#[derive(Debug)] +pub struct HashJoinExec { +/// left side +left: Arc, +/// right side +right: Arc, +/// Set of common columns used to join on +on: HashSet, Review comment: This is a great start but will limit us to cases where the column names are the same between two tables. As a follow-on we can expand this to support different names e..g `customer.id = orders.customer_id`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join
andygrove commented on a change in pull request #8709: URL: https://github.com/apache/arrow/pull/8709#discussion_r526466282 ## File path: rust/datafusion/src/physical_plan/hash_utils.rs ## @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Functionality used both on logical and physical plans + +use crate::error::{DataFusionError, Result}; +use arrow::datatypes::{Field, Schema}; +use std::collections::HashSet; + +/// All valid types of joins. +#[derive(Clone, Debug)] +pub enum JoinHow { Review comment: nit: `JoinType` would be a more standard name This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org