[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join

2020-11-20 Thread GitBox


andygrove commented on a change in pull request #8709:
URL: https://github.com/apache/arrow/pull/8709#discussion_r527750363



##
File path: rust/datafusion/src/physical_plan/hash_utils.rs
##
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Functionality used both on logical and physical plans
+
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::{Field, Schema};
+use std::collections::HashSet;
+
+/// All valid types of joins.
+#[derive(Clone, Debug)]
+pub enum JoinType {
+/// Inner join
+Inner,
+}
+
+/// The on clause of the join, as vector of (left, right) columns.
+pub type JoinOn<'a> = [(&'a str, &'a str)];
+
+/// Checks whether the schemas "left" and "right" and columns "on" represent a 
valid join.
+/// They are valid whenever their columns' intersection equals the set `on`
+pub fn check_join_is_valid(left: , right: , on: ) -> 
Result<()> {
+let left: HashSet = left.fields().iter().map(|f| 
f.name().clone()).collect();
+let right: HashSet =
+right.fields().iter().map(|f| f.name().clone()).collect();
+
+check_join_set_is_valid(, , on)
+}
+
+/// Checks whether the sets left, right and on compose a valid join.
+/// They are valid whenever their intersection equals the set `on`
+fn check_join_set_is_valid(
+left: ,
+right: ,
+on: ,
+) -> Result<()> {
+if on.len() == 0 {
+return Err(DataFusionError::Plan(
+"The 'on' clause of a join cannot be empty".to_string(),
+));
+}
+let on_left = ().map(|on| 
on.0.to_string()).collect::>();
+let left_missing = left.difference(on_left).collect::>();

Review comment:
   Most of the tests pass with this code:
   
   ```rust
   let on_left = ().map(|on| 
on.0.to_string()).collect::>();
   let left_keys = left
   .iter()
   .filter(|f| on_left.contains(*f))
   .map(|s| s.clone())
   .collect::>();
   let left_missing = 
on_left.difference(_keys).collect::>();
   
   let on_right = ().map(|on| 
on.1.to_string()).collect::>();
   let right_keys = right
   .iter()
   .filter(|f| on_right.contains(*f))
   .map(|s| s.clone())
   .collect::>();
   let right_missing = 
on_right.difference(_keys).collect::>();
   
   if left_missing.len() > 0 || right_missing.len() > 0 {
   return Err(DataFusionError::Plan(format!(
   "The left or right side of the join does not have columns 
\"on\": \nMissing on the left: {:?}\nMissing on the right: {:?}",
   left_missing,
   right_missing,
   )));
   };
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join

2020-11-20 Thread GitBox


andygrove commented on a change in pull request #8709:
URL: https://github.com/apache/arrow/pull/8709#discussion_r527740558



##
File path: rust/datafusion/src/physical_plan/hash_utils.rs
##
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Functionality used both on logical and physical plans
+
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::{Field, Schema};
+use std::collections::HashSet;
+
+/// All valid types of joins.
+#[derive(Clone, Debug)]
+pub enum JoinType {
+/// Inner join
+Inner,
+}
+
+/// The on clause of the join, as vector of (left, right) columns.
+pub type JoinOn<'a> = [(&'a str, &'a str)];
+
+/// Checks whether the schemas "left" and "right" and columns "on" represent a 
valid join.
+/// They are valid whenever their columns' intersection equals the set `on`
+pub fn check_join_is_valid(left: , right: , on: ) -> 
Result<()> {
+let left: HashSet = left.fields().iter().map(|f| 
f.name().clone()).collect();
+let right: HashSet =
+right.fields().iter().map(|f| f.name().clone()).collect();
+
+check_join_set_is_valid(, , on)
+}
+
+/// Checks whether the sets left, right and on compose a valid join.
+/// They are valid whenever their intersection equals the set `on`
+fn check_join_set_is_valid(
+left: ,
+right: ,
+on: ,
+) -> Result<()> {
+if on.len() == 0 {
+return Err(DataFusionError::Plan(
+"The 'on' clause of a join cannot be empty".to_string(),
+));
+}
+let on_left = ().map(|on| 
on.0.to_string()).collect::>();
+let left_missing = left.difference(on_left).collect::>();

Review comment:
   I'm confused here and I think this code might not be doing what you 
intended. `left_missing` contains all the left fields except for the fields 
references in the `on` clause, so this function will always fail unless the 
both tables only have join keys and no other columns.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join

2020-11-20 Thread GitBox


andygrove commented on a change in pull request #8709:
URL: https://github.com/apache/arrow/pull/8709#discussion_r527735046



##
File path: rust/datafusion/src/physical_plan/hash_utils.rs
##
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Functionality used both on logical and physical plans
+
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::{Field, Schema};
+use std::collections::HashSet;
+
+/// All valid types of joins.
+#[derive(Clone, Debug)]
+pub enum JoinType {
+/// Inner join
+Inner,
+}
+
+/// The on clause of the join, as vector of (left, right) columns.
+pub type JoinOn<'a> = [(&'a str, &'a str)];
+
+/// Checks whether the schemas "left" and "right" and columns "on" represent a 
valid join.
+/// They are valid whenever their columns' intersection equals the set `on`
+pub fn check_join_is_valid(left: , right: , on: ) -> 
Result<()> {
+let left: HashSet = left.fields().iter().map(|f| 
f.name().clone()).collect();
+let right: HashSet =
+right.fields().iter().map(|f| f.name().clone()).collect();
+
+check_join_set_is_valid(, , on)
+}
+
+/// Checks whether the sets left, right and on compose a valid join.
+/// They are valid whenever their intersection equals the set `on`
+fn check_join_set_is_valid(
+left: ,
+right: ,
+on: ,
+) -> Result<()> {
+if on.len() == 0 {
+return Err(DataFusionError::Plan(
+"The 'on' clause of a join cannot be empty".to_string(),
+));
+}
+let on_left = ().map(|on| 
on.0.to_string()).collect::>();
+let left_missing = left.difference(on_left).collect::>();
+
+let on_right = ().map(|on| 
on.1.to_string()).collect::>();
+let right_missing = right.difference(on_right).collect::>();
+
+if (left_missing.len() > 0) | (right_missing.len() > 0) {
+return Err(DataFusionError::Plan(format!(
+"The left or right side of the join does not have columns 
\"on\": \nMissing on the left: {:?}\nMissing on the right: {:?}",
+left_missing,
+right_missing,
+)));
+};
+Ok(())
+}
+
+/// Creates a schema for a join operation.
+/// The fields from the left side are first
+pub fn build_join_schema(
+left: ,
+right: ,
+on: ,
+join_type: ,
+) -> Schema {
+let fields: Vec = match join_type {
+JoinType::Inner => {
+let on_left = ().map(|on| 
on.0.to_string()).collect::>();
+let on_right = ().map(|on| 
on.1.to_string()).collect::>();
+
+// inner: all fields are there
+let left_fields =
+left.fields().iter().filter(|f| !on_left.contains(f.name()));
+
+let right_fields = right
+.fields()
+.iter()
+.filter(|f| !on_right.contains(f.name()));
+
+// left then right
+left_fields.chain(right_fields).map(|f| f.clone()).collect()
+}
+};
+Schema::new(fields)

Review comment:
   @jorgecarleitao I think the join keys are being dropped from the schema?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join

2020-11-19 Thread GitBox


andygrove commented on a change in pull request #8709:
URL: https://github.com/apache/arrow/pull/8709#discussion_r527067914



##
File path: rust/datafusion/src/physical_plan/hash_utils.rs
##
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Functionality used both on logical and physical plans
+
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::{Field, Schema};
+use std::collections::HashSet;
+
+/// All valid types of joins.
+#[derive(Clone, Debug)]
+pub enum JoinType {
+/// Inner join
+Inner,
+}
+
+/// Checks whether the schemas "left" and "right" and columns "on" represent a 
valid join.
+/// They are valid whenever their columns' intersection equals the set `on`
+pub fn check_join_is_valid(
+left: ,
+right: ,
+on: ,
+) -> Result<()> {
+let left: HashSet = left.fields().iter().map(|f| 
f.name().clone()).collect();
+let right: HashSet =
+right.fields().iter().map(|f| f.name().clone()).collect();
+
+check_join_set_is_valid(, , )
+}
+
+/// Checks whether the sets left, right and on compose a valid join.
+/// They are valid whenever their intersection equals the set `on`
+fn check_join_set_is_valid(
+left: ,
+right: ,
+on: ,
+) -> Result<()> {
+if on.len() == 0 {
+return Err(DataFusionError::Plan(
+"The 'on' clause of a join cannot be empty".to_string(),
+));
+}
+
+let on_columns = on.iter().collect::>();
+let common_columns = left.intersection().collect::>();
+let missing = on_columns
+.difference(_columns)
+.collect::>();
+if missing.len() > 0 {
+return Err(DataFusionError::Plan(format!(
+"The left or right side of the join does not have columns {:?} 
columns on \"on\": \nLeft: {:?}\nRight: {:?}\nOn: {:?}",
+missing,
+left,
+right,
+on,
+).to_string()));
+};
+Ok(())
+}
+
+/// Creates a schema for a join operation.
+/// The fields "on" from the left side are always first
+pub fn build_join_schema(
+left: ,
+right: ,
+on: ,
+join_type: ,
+) -> Schema {
+let fields: Vec = match join_type {
+JoinType::Inner => {
+// inner: all fields are there
+
+let on_fields = left.fields().iter().filter(|f| 
on.contains(f.name()));
+
+let left_fields = left.fields().iter().filter(|f| 
!on.contains(f.name()));
+
+let right_fields = right.fields().iter().filter(|f| 
!on.contains(f.name()));
+
+// "on" are first by construction, then left, then right
+on_fields
+.chain(left_fields)
+.chain(right_fields)
+.map(|f| f.clone())
+.collect()

Review comment:
   I think you're right. This is simpler and forces the user to add aliases 
as required to avoid conflicts.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join

2020-11-19 Thread GitBox


andygrove commented on a change in pull request #8709:
URL: https://github.com/apache/arrow/pull/8709#discussion_r527061979



##
File path: rust/datafusion/src/physical_plan/hash_join.rs
##
@@ -0,0 +1,467 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the join plan for executing partitions in parallel and then 
joining the results
+//! into a set of partitions.
+
+use std::sync::Arc;
+use std::{
+any::Any,
+collections::{HashMap, HashSet},
+};
+
+use async_trait::async_trait;
+use futures::{Stream, StreamExt, TryStreamExt};
+
+use arrow::array::{make_array, Array, MutableArrayData};
+use arrow::datatypes::{Schema, SchemaRef};
+use arrow::error::Result as ArrowResult;
+use arrow::record_batch::RecordBatch;
+
+use super::hash_utils::{build_join_schema, check_join_is_valid, JoinHow};
+use super::{expressions::col, hash_aggregate::create_key};
+use crate::error::{DataFusionError, Result};
+
+use super::{
+group_scalar::GroupByScalar, ExecutionPlan, Partitioning, 
RecordBatchStream,
+SendableRecordBatchStream,
+};
+
+// An index of (batch, row) uniquely identifying a row in a part.
+type Index = (usize, usize);
+// None represents a null (e.g. in case of a left join, some right indices are 
null)
+type JoinIndex = (usize, usize);
+// A mapping "on" value -> list of row indexes with this key's value
+// E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = 
[1, 2] is true
+// for rows 3 and 8 from batch 0 and row 6 from batch 1.
+type JoinHashMap = HashMap, Vec>;
+type JoinLeftData = (JoinHashMap, Vec);
+
+/// join execution plan executes partitions in parallel and combines them into 
a set of
+/// partitions.
+#[derive(Debug)]
+pub struct HashJoinExec {
+/// left side
+left: Arc,
+/// right side
+right: Arc,
+/// Set of common columns used to join on
+on: HashSet,

Review comment:
   Yes, this is an implicit projection that happens on the join 
expressions. This is usually not represented as a separate step in a query 
plan, in my experience at least, and is handled within the join operator 
itself. I agree that this would be equivalent in compute to having a separate 
projection in the plan.

   I would be fine if we just want to support join on columns for this PR and 
look at supporting any expression as a separate issue. We can always work 
around the limitation by doing explicit projections before the join.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join

2020-11-19 Thread GitBox


andygrove commented on a change in pull request #8709:
URL: https://github.com/apache/arrow/pull/8709#discussion_r527028885



##
File path: rust/datafusion/src/physical_plan/hash_utils.rs
##
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Functionality used both on logical and physical plans
+
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::{Field, Schema};
+use std::collections::HashSet;
+
+/// All valid types of joins.
+#[derive(Clone, Debug)]
+pub enum JoinType {
+/// Inner join
+Inner,
+}
+
+/// Checks whether the schemas "left" and "right" and columns "on" represent a 
valid join.
+/// They are valid whenever their columns' intersection equals the set `on`
+pub fn check_join_is_valid(
+left: ,
+right: ,
+on: ,
+) -> Result<()> {
+let left: HashSet = left.fields().iter().map(|f| 
f.name().clone()).collect();
+let right: HashSet =
+right.fields().iter().map(|f| f.name().clone()).collect();
+
+check_join_set_is_valid(, , )
+}
+
+/// Checks whether the sets left, right and on compose a valid join.
+/// They are valid whenever their intersection equals the set `on`
+fn check_join_set_is_valid(
+left: ,
+right: ,
+on: ,
+) -> Result<()> {
+if on.len() == 0 {
+return Err(DataFusionError::Plan(
+"The 'on' clause of a join cannot be empty".to_string(),
+));
+}
+
+let on_columns = on.iter().collect::>();
+let common_columns = left.intersection().collect::>();
+let missing = on_columns
+.difference(_columns)
+.collect::>();
+if missing.len() > 0 {
+return Err(DataFusionError::Plan(format!(
+"The left or right side of the join does not have columns {:?} 
columns on \"on\": \nLeft: {:?}\nRight: {:?}\nOn: {:?}",
+missing,
+left,
+right,
+on,
+).to_string()));
+};
+Ok(())
+}
+
+/// Creates a schema for a join operation.
+/// The fields "on" from the left side are always first
+pub fn build_join_schema(
+left: ,
+right: ,
+on: ,
+join_type: ,
+) -> Schema {
+let fields: Vec = match join_type {
+JoinType::Inner => {
+// inner: all fields are there
+
+let on_fields = left.fields().iter().filter(|f| 
on.contains(f.name()));
+
+let left_fields = left.fields().iter().filter(|f| 
!on.contains(f.name()));
+
+let right_fields = right.fields().iter().filter(|f| 
!on.contains(f.name()));
+
+// "on" are first by construction, then left, then right
+on_fields
+.chain(left_fields)
+.chain(right_fields)
+.map(|f| f.clone())
+.collect()

Review comment:
   @jorgecarleitao Just wanted to make sure you saw this :point_up:





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join

2020-11-19 Thread GitBox


andygrove commented on a change in pull request #8709:
URL: https://github.com/apache/arrow/pull/8709#discussion_r527020685



##
File path: rust/datafusion/src/physical_plan/hash_utils.rs
##
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Functionality used both on logical and physical plans
+
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::{Field, Schema};
+use std::collections::HashSet;
+
+/// All valid types of joins.
+#[derive(Clone, Debug)]
+pub enum JoinType {
+/// Inner join
+Inner,
+}
+
+/// Checks whether the schemas "left" and "right" and columns "on" represent a 
valid join.
+/// They are valid whenever their columns' intersection equals the set `on`
+pub fn check_join_is_valid(
+left: ,
+right: ,
+on: ,
+) -> Result<()> {
+let left: HashSet = left.fields().iter().map(|f| 
f.name().clone()).collect();
+let right: HashSet =
+right.fields().iter().map(|f| f.name().clone()).collect();
+
+check_join_set_is_valid(, , )
+}
+
+/// Checks whether the sets left, right and on compose a valid join.
+/// They are valid whenever their intersection equals the set `on`
+fn check_join_set_is_valid(
+left: ,
+right: ,
+on: ,
+) -> Result<()> {
+if on.len() == 0 {
+return Err(DataFusionError::Plan(
+"The 'on' clause of a join cannot be empty".to_string(),
+));
+}
+
+let on_columns = on.iter().collect::>();
+let common_columns = left.intersection().collect::>();
+let missing = on_columns
+.difference(_columns)
+.collect::>();
+if missing.len() > 0 {
+return Err(DataFusionError::Plan(format!(
+"The left or right side of the join does not have columns {:?} 
columns on \"on\": \nLeft: {:?}\nRight: {:?}\nOn: {:?}",
+missing,
+left,
+right,
+on,
+).to_string()));
+};
+Ok(())
+}
+
+/// Creates a schema for a join operation.
+/// The fields "on" from the left side are always first
+pub fn build_join_schema(
+left: ,
+right: ,
+on: ,
+join_type: ,
+) -> Schema {
+let fields: Vec = match join_type {
+JoinType::Inner => {
+// inner: all fields are there
+
+let on_fields = left.fields().iter().filter(|f| 
on.contains(f.name()));
+
+let left_fields = left.fields().iter().filter(|f| 
!on.contains(f.name()));
+
+let right_fields = right.fields().iter().filter(|f| 
!on.contains(f.name()));
+
+// "on" are first by construction, then left, then right
+on_fields
+.chain(left_fields)
+.chain(right_fields)
+.map(|f| f.clone())
+.collect()

Review comment:
   The output schema of the join would simply have fields named "a.id", 
"a.name", "b.id", "b.name" based on the previous example.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join

2020-11-19 Thread GitBox


andygrove commented on a change in pull request #8709:
URL: https://github.com/apache/arrow/pull/8709#discussion_r527019693



##
File path: rust/datafusion/src/physical_plan/hash_utils.rs
##
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Functionality used both on logical and physical plans
+
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::{Field, Schema};
+use std::collections::HashSet;
+
+/// All valid types of joins.
+#[derive(Clone, Debug)]
+pub enum JoinType {
+/// Inner join
+Inner,
+}
+
+/// Checks whether the schemas "left" and "right" and columns "on" represent a 
valid join.
+/// They are valid whenever their columns' intersection equals the set `on`
+pub fn check_join_is_valid(
+left: ,
+right: ,
+on: ,
+) -> Result<()> {
+let left: HashSet = left.fields().iter().map(|f| 
f.name().clone()).collect();
+let right: HashSet =
+right.fields().iter().map(|f| f.name().clone()).collect();
+
+check_join_set_is_valid(, , )
+}
+
+/// Checks whether the sets left, right and on compose a valid join.
+/// They are valid whenever their intersection equals the set `on`
+fn check_join_set_is_valid(
+left: ,
+right: ,
+on: ,
+) -> Result<()> {
+if on.len() == 0 {
+return Err(DataFusionError::Plan(
+"The 'on' clause of a join cannot be empty".to_string(),
+));
+}
+
+let on_columns = on.iter().collect::>();
+let common_columns = left.intersection().collect::>();
+let missing = on_columns
+.difference(_columns)
+.collect::>();
+if missing.len() > 0 {
+return Err(DataFusionError::Plan(format!(
+"The left or right side of the join does not have columns {:?} 
columns on \"on\": \nLeft: {:?}\nRight: {:?}\nOn: {:?}",
+missing,
+left,
+right,
+on,
+).to_string()));
+};
+Ok(())
+}
+
+/// Creates a schema for a join operation.
+/// The fields "on" from the left side are always first
+pub fn build_join_schema(
+left: ,
+right: ,
+on: ,
+join_type: ,
+) -> Schema {
+let fields: Vec = match join_type {
+JoinType::Inner => {
+// inner: all fields are there
+
+let on_fields = left.fields().iter().filter(|f| 
on.contains(f.name()));
+
+let left_fields = left.fields().iter().filter(|f| 
!on.contains(f.name()));
+
+let right_fields = right.fields().iter().filter(|f| 
!on.contains(f.name()));
+
+// "on" are first by construction, then left, then right
+on_fields
+.chain(left_fields)
+.chain(right_fields)
+.map(|f| f.clone())
+.collect()

Review comment:
   I don't understand why we would want to do that. Each operator has a 
single output schema. The join operator creates a new schema derived from the 
input schemas.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join

2020-11-19 Thread GitBox


andygrove commented on a change in pull request #8709:
URL: https://github.com/apache/arrow/pull/8709#discussion_r526856756



##
File path: rust/datafusion/src/physical_plan/mod.rs
##
@@ -89,6 +89,8 @@ pub trait ExecutionPlan: Debug + Send + Sync {
 pub enum Partitioning {
 /// Unknown partitioning scheme
 UnknownPartitioning(usize),
+/// parts hashed by columns and `N` partitions
+HashPartitioning(Vec, usize),

Review comment:
   It makes sense for us to add a `HashPartitioning` variant here, but this 
isn't required for implementing the hash join operator since we aren't changing 
partitioning.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join

2020-11-19 Thread GitBox


andygrove commented on a change in pull request #8709:
URL: https://github.com/apache/arrow/pull/8709#discussion_r526855265



##
File path: rust/datafusion/src/physical_plan/hash_join.rs
##
@@ -0,0 +1,507 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the join plan for executing partitions in parallel and then 
joining the results
+//! into a set of partitions.
+
+use std::sync::Arc;
+use std::{
+any::Any,
+collections::{HashMap, HashSet},
+};
+
+use async_trait::async_trait;
+use futures::{Stream, StreamExt, TryStreamExt};
+
+use arrow::array::{make_array, Array, MutableArrayData};
+use arrow::datatypes::{Schema, SchemaRef};
+use arrow::error::Result as ArrowResult;
+use arrow::record_batch::RecordBatch;
+
+use super::{expressions::col, hash_aggregate::create_key};
+use super::{
+hash_utils::{build_join_schema, check_join_is_valid, JoinType},
+merge::MergeExec,
+};
+use crate::error::{DataFusionError, Result};
+
+use super::{
+group_scalar::GroupByScalar, ExecutionPlan, Partitioning, 
RecordBatchStream,
+SendableRecordBatchStream,
+};
+
+// An index of (batch, row) uniquely identifying a row in a part.
+type Index = (usize, usize);
+// A pair (left index, right index)
+// Note that while this is currently equal to `Index`, the `JoinIndex` is 
semantically different
+// as a left join may issue None indices, in which case
+type JoinIndex = Option<(usize, usize)>;
+// Maps ["on" value] -> [list of indices with this key's value]
+// E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = 
[1, 2] is true
+// for rows 3 and 8 from batch 0 and row 6 from batch 1.
+type JoinHashMap = HashMap, Vec>;
+type JoinLeftData = (JoinHashMap, Vec);
+
+/// join execution plan executes partitions in parallel and combines them into 
a set of
+/// partitions.
+#[derive(Debug)]
+pub struct HashJoinExec {
+/// left side
+left: Arc,
+/// right side
+right: Arc,
+/// Set of common columns used to join on
+on: HashSet,
+/// How the join is performed
+join_type: JoinType,
+/// The schema once the join is applied
+schema: SchemaRef,
+}
+
+impl HashJoinExec {
+/// Tries to create a new [HashJoinExec].
+/// # Error
+/// This function errors when it is not possible to join the left and 
right sides on keys `on`.
+pub fn try_new(
+left: Arc,
+right: Arc,
+on: ,
+join_type: ,
+) -> Result {
+let left_schema = left.schema();
+let right_schema = right.schema();
+check_join_is_valid(_schema, _schema, )?;
+
+let on = on.iter().map(|s| s.clone()).collect::>();
+
+let schema = Arc::new(build_join_schema(
+_schema,
+_schema,
+,
+_type,
+));
+
+Ok(HashJoinExec {
+left,
+right,
+on: on.clone(),
+join_type: join_type.clone(),
+schema,
+})
+}
+}
+
+#[async_trait]
+impl ExecutionPlan for HashJoinExec {
+fn as_any() ->  Any {
+self
+}
+
+fn schema() -> SchemaRef {
+self.schema.clone()
+}
+
+fn children() -> Vec> {
+vec![self.left.clone(), self.right.clone()]
+}
+
+fn with_new_children(
+,
+children: Vec>,
+) -> Result> {
+match children.len() {
+2 => Ok(Arc::new(HashJoinExec::try_new(
+children[0].clone(),
+children[1].clone(),
+,
+_type,
+)?)),
+_ => Err(DataFusionError::Internal(
+"HashJoinExec wrong number of children".to_string(),
+)),
+}
+}
+
+fn output_partitioning() -> Partitioning {
+self.right.output_partitioning()

Review comment:
   This is correct. The output partitioning is the same as the partitioning 
of the right-hand input.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact 

[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join

2020-11-19 Thread GitBox


andygrove commented on a change in pull request #8709:
URL: https://github.com/apache/arrow/pull/8709#discussion_r526851354



##
File path: rust/datafusion/src/physical_plan/hash_utils.rs
##
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Functionality used both on logical and physical plans
+
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::{Field, Schema};
+use std::collections::HashSet;
+
+/// All valid types of joins.
+#[derive(Clone, Debug)]
+pub enum JoinType {
+/// Inner join
+Inner,
+}
+
+/// Checks whether the schemas "left" and "right" and columns "on" represent a 
valid join.
+/// They are valid whenever their columns' intersection equals the set `on`
+pub fn check_join_is_valid(
+left: ,
+right: ,
+on: ,
+) -> Result<()> {
+let left: HashSet = left.fields().iter().map(|f| 
f.name().clone()).collect();
+let right: HashSet =
+right.fields().iter().map(|f| f.name().clone()).collect();
+
+check_join_set_is_valid(, , )
+}
+
+/// Checks whether the sets left, right and on compose a valid join.
+/// They are valid whenever their intersection equals the set `on`
+fn check_join_set_is_valid(
+left: ,
+right: ,
+on: ,
+) -> Result<()> {
+if on.len() == 0 {
+return Err(DataFusionError::Plan(
+"The 'on' clause of a join cannot be empty".to_string(),
+));
+}
+
+let on_columns = on.iter().collect::>();
+let common_columns = left.intersection().collect::>();
+let missing = on_columns
+.difference(_columns)
+.collect::>();
+if missing.len() > 0 {
+return Err(DataFusionError::Plan(format!(
+"The left or right side of the join does not have columns {:?} 
columns on \"on\": \nLeft: {:?}\nRight: {:?}\nOn: {:?}",
+missing,
+left,
+right,
+on,
+).to_string()));
+};
+Ok(())
+}
+
+/// Creates a schema for a join operation.
+/// The fields "on" from the left side are always first
+pub fn build_join_schema(
+left: ,
+right: ,
+on: ,
+join_type: ,
+) -> Schema {
+let fields: Vec = match join_type {
+JoinType::Inner => {
+// inner: all fields are there
+
+let on_fields = left.fields().iter().filter(|f| 
on.contains(f.name()));
+
+let left_fields = left.fields().iter().filter(|f| 
!on.contains(f.name()));
+
+let right_fields = right.fields().iter().filter(|f| 
!on.contains(f.name()));
+
+// "on" are first by construction, then left, then right
+on_fields
+.chain(left_fields)
+.chain(right_fields)
+.map(|f| f.clone())
+.collect()

Review comment:
   It would be more standard to have all the left fields followed by all 
the right fields. Also, the new schema should have fully qualified names that 
include the table/alias prefix.
   
   For example if we join table 'a' with fields `id` and `name` with table 'b' 
that also has fields `id` and `name`, the output schema would be `a.id`, 
`a.name`, `b.id`, `b.name`.
   
   It is quite possible that we are lacking support currently for being able to 
reference these qualified field names, so we will need to deal with that before 
we can fully integrate this new join operator into DataFusion.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join

2020-11-19 Thread GitBox


andygrove commented on a change in pull request #8709:
URL: https://github.com/apache/arrow/pull/8709#discussion_r526851354



##
File path: rust/datafusion/src/physical_plan/hash_utils.rs
##
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Functionality used both on logical and physical plans
+
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::{Field, Schema};
+use std::collections::HashSet;
+
+/// All valid types of joins.
+#[derive(Clone, Debug)]
+pub enum JoinType {
+/// Inner join
+Inner,
+}
+
+/// Checks whether the schemas "left" and "right" and columns "on" represent a 
valid join.
+/// They are valid whenever their columns' intersection equals the set `on`
+pub fn check_join_is_valid(
+left: ,
+right: ,
+on: ,
+) -> Result<()> {
+let left: HashSet = left.fields().iter().map(|f| 
f.name().clone()).collect();
+let right: HashSet =
+right.fields().iter().map(|f| f.name().clone()).collect();
+
+check_join_set_is_valid(, , )
+}
+
+/// Checks whether the sets left, right and on compose a valid join.
+/// They are valid whenever their intersection equals the set `on`
+fn check_join_set_is_valid(
+left: ,
+right: ,
+on: ,
+) -> Result<()> {
+if on.len() == 0 {
+return Err(DataFusionError::Plan(
+"The 'on' clause of a join cannot be empty".to_string(),
+));
+}
+
+let on_columns = on.iter().collect::>();
+let common_columns = left.intersection().collect::>();
+let missing = on_columns
+.difference(_columns)
+.collect::>();
+if missing.len() > 0 {
+return Err(DataFusionError::Plan(format!(
+"The left or right side of the join does not have columns {:?} 
columns on \"on\": \nLeft: {:?}\nRight: {:?}\nOn: {:?}",
+missing,
+left,
+right,
+on,
+).to_string()));
+};
+Ok(())
+}
+
+/// Creates a schema for a join operation.
+/// The fields "on" from the left side are always first
+pub fn build_join_schema(
+left: ,
+right: ,
+on: ,
+join_type: ,
+) -> Schema {
+let fields: Vec = match join_type {
+JoinType::Inner => {
+// inner: all fields are there
+
+let on_fields = left.fields().iter().filter(|f| 
on.contains(f.name()));
+
+let left_fields = left.fields().iter().filter(|f| 
!on.contains(f.name()));
+
+let right_fields = right.fields().iter().filter(|f| 
!on.contains(f.name()));
+
+// "on" are first by construction, then left, then right
+on_fields
+.chain(left_fields)
+.chain(right_fields)
+.map(|f| f.clone())
+.collect()

Review comment:
   It would be more standard to have all the left fields followed by all 
the right fields. Also, the new schema should have fully qualified names that 
include the table/alias prefix.
   
   For example if we join table 'a' with fields `id` and name` with table 'b' 
that also has fields `id` and `name`, the output schema would be `a.id`, 
`a.name`, `b.id`, `b.name`.
   
   It is quite possible that we are lacking support currently for being able to 
reference these qualified field names, so we will need to deal with that before 
we can fully integrate this new join operator into DataFusion.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join

2020-11-19 Thread GitBox


andygrove commented on a change in pull request #8709:
URL: https://github.com/apache/arrow/pull/8709#discussion_r526849347



##
File path: rust/datafusion/src/physical_plan/hash_join.rs
##
@@ -0,0 +1,467 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the join plan for executing partitions in parallel and then 
joining the results
+//! into a set of partitions.
+
+use std::sync::Arc;
+use std::{
+any::Any,
+collections::{HashMap, HashSet},
+};
+
+use async_trait::async_trait;
+use futures::{Stream, StreamExt, TryStreamExt};
+
+use arrow::array::{make_array, Array, MutableArrayData};
+use arrow::datatypes::{Schema, SchemaRef};
+use arrow::error::Result as ArrowResult;
+use arrow::record_batch::RecordBatch;
+
+use super::hash_utils::{build_join_schema, check_join_is_valid, JoinHow};
+use super::{expressions::col, hash_aggregate::create_key};
+use crate::error::{DataFusionError, Result};
+
+use super::{
+group_scalar::GroupByScalar, ExecutionPlan, Partitioning, 
RecordBatchStream,
+SendableRecordBatchStream,
+};
+
+// An index of (batch, row) uniquely identifying a row in a part.
+type Index = (usize, usize);
+// None represents a null (e.g. in case of a left join, some right indices are 
null)
+type JoinIndex = (usize, usize);
+// A mapping "on" value -> list of row indexes with this key's value
+// E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = 
[1, 2] is true
+// for rows 3 and 8 from batch 0 and row 6 from batch 1.
+type JoinHashMap = HashMap, Vec>;
+type JoinLeftData = (JoinHashMap, Vec);
+
+/// join execution plan executes partitions in parallel and combines them into 
a set of
+/// partitions.
+#[derive(Debug)]
+pub struct HashJoinExec {
+/// left side
+left: Arc,
+/// right side
+right: Arc,
+/// Set of common columns used to join on
+on: HashSet,

Review comment:
   After thinking about this some more, I think it might be better to 
address this in the current PR. I would suggest either:
   
   ```rust
   left_keys: Vec
   right_keys: Vec,
   ```
   
   or the more general case:
   
   ```rust
   left_keys: Vec>
   right_keys: Vec>,
   ```
   
   It should be possible to perform an equi-join using any deterministic 
expression, for example:
   
   ```
   SELECT a.*, b.* FROM a JOIN b on UPPER(a.name) = UPPER(CONCAT(b.first, ' ', 
b.last))
   ```
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join

2020-11-19 Thread GitBox


andygrove commented on a change in pull request #8709:
URL: https://github.com/apache/arrow/pull/8709#discussion_r526837738



##
File path: rust/datafusion/src/physical_plan/hash_join.rs
##
@@ -0,0 +1,507 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the join plan for executing partitions in parallel and then 
joining the results
+//! into a set of partitions.
+
+use std::sync::Arc;
+use std::{
+any::Any,
+collections::{HashMap, HashSet},
+};
+
+use async_trait::async_trait;
+use futures::{Stream, StreamExt, TryStreamExt};
+
+use arrow::array::{make_array, Array, MutableArrayData};
+use arrow::datatypes::{Schema, SchemaRef};
+use arrow::error::Result as ArrowResult;
+use arrow::record_batch::RecordBatch;
+
+use super::{expressions::col, hash_aggregate::create_key};
+use super::{
+hash_utils::{build_join_schema, check_join_is_valid, JoinType},
+merge::MergeExec,
+};
+use crate::error::{DataFusionError, Result};
+
+use super::{
+group_scalar::GroupByScalar, ExecutionPlan, Partitioning, 
RecordBatchStream,
+SendableRecordBatchStream,
+};
+
+// An index of (batch, row) uniquely identifying a row in a part.
+type Index = (usize, usize);
+// A pair (left index, right index)
+// Note that while this is currently equal to `Index`, the `JoinIndex` is 
semantically different
+// as a left join may issue None indices, in which case
+type JoinIndex = Option<(usize, usize)>;
+// Maps ["on" value] -> [list of indices with this key's value]
+// E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = 
[1, 2] is true
+// for rows 3 and 8 from batch 0 and row 6 from batch 1.
+type JoinHashMap = HashMap, Vec>;
+type JoinLeftData = (JoinHashMap, Vec);
+
+/// join execution plan executes partitions in parallel and combines them into 
a set of
+/// partitions.
+#[derive(Debug)]
+pub struct HashJoinExec {
+/// left side
+left: Arc,
+/// right side
+right: Arc,
+/// Set of common columns used to join on
+on: HashSet,
+/// How the join is performed
+join_type: JoinType,
+/// The schema once the join is applied
+schema: SchemaRef,
+}
+
+impl HashJoinExec {
+/// Tries to create a new [HashJoinExec].
+/// # Error
+/// This function errors when it is not possible to join the left and 
right sides on keys `on`.
+pub fn try_new(
+left: Arc,
+right: Arc,
+on: ,
+join_type: ,
+) -> Result {
+let left_schema = left.schema();
+let right_schema = right.schema();
+check_join_is_valid(_schema, _schema, )?;
+
+let on = on.iter().map(|s| s.clone()).collect::>();
+
+let schema = Arc::new(build_join_schema(
+_schema,
+_schema,
+,
+_type,
+));
+
+Ok(HashJoinExec {
+left,
+right,
+on: on.clone(),
+join_type: join_type.clone(),
+schema,
+})
+}
+}
+
+#[async_trait]
+impl ExecutionPlan for HashJoinExec {
+fn as_any() ->  Any {
+self
+}
+
+fn schema() -> SchemaRef {
+self.schema.clone()
+}
+
+fn children() -> Vec> {
+vec![self.left.clone(), self.right.clone()]
+}
+
+fn with_new_children(
+,
+children: Vec>,
+) -> Result> {
+match children.len() {
+2 => Ok(Arc::new(HashJoinExec::try_new(
+children[0].clone(),
+children[1].clone(),
+,
+_type,
+)?)),
+_ => Err(DataFusionError::Internal(
+"HashJoinExec wrong number of children".to_string(),
+)),
+}
+}
+
+fn output_partitioning() -> Partitioning {
+self.right.output_partitioning()
+}
+
+async fn execute(, partition: usize) -> 
Result {
+// merge all parts into a single stream
+// this is currently expensive as we re-compute this for every part 
from the right
+// TODO: Fix this issue: we can't share this state across parts on the 
right.

Review comment:
   I woulld be fine with us doing this as a follow up and get the expensive 
version of this 

[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join

2020-11-19 Thread GitBox


andygrove commented on a change in pull request #8709:
URL: https://github.com/apache/arrow/pull/8709#discussion_r526834131



##
File path: rust/datafusion/src/physical_plan/hash_join.rs
##
@@ -0,0 +1,507 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the join plan for executing partitions in parallel and then 
joining the results
+//! into a set of partitions.
+
+use std::sync::Arc;
+use std::{
+any::Any,
+collections::{HashMap, HashSet},
+};
+
+use async_trait::async_trait;
+use futures::{Stream, StreamExt, TryStreamExt};
+
+use arrow::array::{make_array, Array, MutableArrayData};
+use arrow::datatypes::{Schema, SchemaRef};
+use arrow::error::Result as ArrowResult;
+use arrow::record_batch::RecordBatch;
+
+use super::{expressions::col, hash_aggregate::create_key};
+use super::{
+hash_utils::{build_join_schema, check_join_is_valid, JoinType},
+merge::MergeExec,
+};
+use crate::error::{DataFusionError, Result};
+
+use super::{
+group_scalar::GroupByScalar, ExecutionPlan, Partitioning, 
RecordBatchStream,
+SendableRecordBatchStream,
+};
+
+// An index of (batch, row) uniquely identifying a row in a part.
+type Index = (usize, usize);
+// A pair (left index, right index)
+// Note that while this is currently equal to `Index`, the `JoinIndex` is 
semantically different
+// as a left join may issue None indices, in which case
+type JoinIndex = Option<(usize, usize)>;
+// Maps ["on" value] -> [list of indices with this key's value]
+// E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = 
[1, 2] is true
+// for rows 3 and 8 from batch 0 and row 6 from batch 1.
+type JoinHashMap = HashMap, Vec>;
+type JoinLeftData = (JoinHashMap, Vec);
+
+/// join execution plan executes partitions in parallel and combines them into 
a set of
+/// partitions.
+#[derive(Debug)]
+pub struct HashJoinExec {
+/// left side
+left: Arc,
+/// right side
+right: Arc,
+/// Set of common columns used to join on
+on: HashSet,
+/// How the join is performed
+join_type: JoinType,
+/// The schema once the join is applied
+schema: SchemaRef,
+}
+
+impl HashJoinExec {
+/// Tries to create a new [HashJoinExec].
+/// # Error
+/// This function errors when it is not possible to join the left and 
right sides on keys `on`.
+pub fn try_new(
+left: Arc,
+right: Arc,
+on: ,
+join_type: ,
+) -> Result {
+let left_schema = left.schema();
+let right_schema = right.schema();
+check_join_is_valid(_schema, _schema, )?;
+
+let on = on.iter().map(|s| s.clone()).collect::>();
+
+let schema = Arc::new(build_join_schema(
+_schema,
+_schema,
+,
+_type,
+));
+
+Ok(HashJoinExec {
+left,
+right,
+on: on.clone(),
+join_type: join_type.clone(),
+schema,
+})
+}
+}
+
+#[async_trait]
+impl ExecutionPlan for HashJoinExec {
+fn as_any() ->  Any {
+self
+}
+
+fn schema() -> SchemaRef {
+self.schema.clone()
+}
+
+fn children() -> Vec> {
+vec![self.left.clone(), self.right.clone()]
+}
+
+fn with_new_children(
+,
+children: Vec>,
+) -> Result> {
+match children.len() {
+2 => Ok(Arc::new(HashJoinExec::try_new(
+children[0].clone(),
+children[1].clone(),
+,
+_type,
+)?)),
+_ => Err(DataFusionError::Internal(
+"HashJoinExec wrong number of children".to_string(),
+)),
+}
+}
+
+fn output_partitioning() -> Partitioning {
+self.right.output_partitioning()
+}
+
+async fn execute(, partition: usize) -> 
Result {
+// merge all parts into a single stream
+// this is currently expensive as we re-compute this for every part 
from the right
+// TODO: Fix this issue: we can't share this state across parts on the 
right.

Review comment:
   When I tried to implement this, my approach was to store the 
materialized left-hand side 

[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join

2020-11-18 Thread GitBox


andygrove commented on a change in pull request #8709:
URL: https://github.com/apache/arrow/pull/8709#discussion_r526467098



##
File path: rust/datafusion/src/physical_plan/hash_join.rs
##
@@ -0,0 +1,467 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the join plan for executing partitions in parallel and then 
joining the results
+//! into a set of partitions.
+
+use std::sync::Arc;
+use std::{
+any::Any,
+collections::{HashMap, HashSet},
+};
+
+use async_trait::async_trait;
+use futures::{Stream, StreamExt, TryStreamExt};
+
+use arrow::array::{make_array, Array, MutableArrayData};
+use arrow::datatypes::{Schema, SchemaRef};
+use arrow::error::Result as ArrowResult;
+use arrow::record_batch::RecordBatch;
+
+use super::hash_utils::{build_join_schema, check_join_is_valid, JoinHow};
+use super::{expressions::col, hash_aggregate::create_key};
+use crate::error::{DataFusionError, Result};
+
+use super::{
+group_scalar::GroupByScalar, ExecutionPlan, Partitioning, 
RecordBatchStream,
+SendableRecordBatchStream,
+};
+
+// An index of (batch, row) uniquely identifying a row in a part.
+type Index = (usize, usize);
+// None represents a null (e.g. in case of a left join, some right indices are 
null)
+type JoinIndex = (usize, usize);
+// A mapping "on" value -> list of row indexes with this key's value
+// E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = 
[1, 2] is true
+// for rows 3 and 8 from batch 0 and row 6 from batch 1.
+type JoinHashMap = HashMap, Vec>;
+type JoinLeftData = (JoinHashMap, Vec);
+
+/// join execution plan executes partitions in parallel and combines them into 
a set of
+/// partitions.
+#[derive(Debug)]
+pub struct HashJoinExec {
+/// left side
+left: Arc,
+/// right side
+right: Arc,
+/// Set of common columns used to join on
+on: HashSet,

Review comment:
   This is a great start but will limit us to cases where the column names 
are the same between two tables. As a follow-on we can expand this to support 
different names e..g `customer.id = orders.customer_id`. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] andygrove commented on a change in pull request #8709: ARROW-9555: [Rust] [DataFusion] Implement physical node for inner join

2020-11-18 Thread GitBox


andygrove commented on a change in pull request #8709:
URL: https://github.com/apache/arrow/pull/8709#discussion_r526466282



##
File path: rust/datafusion/src/physical_plan/hash_utils.rs
##
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Functionality used both on logical and physical plans
+
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::{Field, Schema};
+use std::collections::HashSet;
+
+/// All valid types of joins.
+#[derive(Clone, Debug)]
+pub enum JoinHow {

Review comment:
   nit: `JoinType` would be a more standard name





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org