(arrow-datafusion) branch main updated (feb9100432 -> 118eecdc83)

2024-04-11 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


from feb9100432 fix#9501 (#10028)
 add 118eecdc83 Always pass DataType to PrimitiveDistinctCountAccumulator 
(#10047)

No new revisions were added by this update.

Summary of changes:
 .../src/aggregate/count_distinct/mod.rs| 103 -
 .../src/aggregate/count_distinct/native.rs |   9 +-
 2 files changed, 61 insertions(+), 51 deletions(-)



(arrow-datafusion) branch main updated: modify emit() of TopK (#10030)

2024-04-10 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 69595a4845 modify emit() of TopK (#10030)
69595a4845 is described below

commit 69595a48458715aadffc56974665ebbafee35bd7
Author: JasonLi 
AuthorDate: Thu Apr 11 04:10:27 2024 +0800

modify emit() of TopK (#10030)
---
 datafusion/physical-plan/src/topk/mod.rs | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/datafusion/physical-plan/src/topk/mod.rs 
b/datafusion/physical-plan/src/topk/mod.rs
index 9120566273..6a77bfaf3c 100644
--- a/datafusion/physical-plan/src/topk/mod.rs
+++ b/datafusion/physical-plan/src/topk/mod.rs
@@ -208,7 +208,7 @@ impl TopK {
 // break into record batches as needed
 let mut batches = vec![];
 loop {
-if batch.num_rows() < batch_size {
+if batch.num_rows() <= batch_size {
 batches.push(Ok(batch));
 break;
 } else {



(arrow-datafusion) branch main updated: Implement semi/anti join output statistics estimation (#9800)

2024-03-30 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 21fe0b7762 Implement semi/anti join output statistics estimation 
(#9800)
21fe0b7762 is described below

commit 21fe0b7762d088731689750e2cef1762d4f9db5e
Author: Eduard Karacharov <13005055+kor...@users.noreply.github.com>
AuthorDate: Sat Mar 30 14:44:35 2024 +0200

Implement semi/anti join output statistics estimation (#9800)

* semi/anti join output statistics

* fix antijoin cardinality estimation
---
 datafusion/physical-plan/src/joins/utils.rs | 373 
 1 file changed, 323 insertions(+), 50 deletions(-)

diff --git a/datafusion/physical-plan/src/joins/utils.rs 
b/datafusion/physical-plan/src/joins/utils.rs
index 1cb2b100e2..a3d20b97d1 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -825,27 +825,27 @@ fn estimate_join_cardinality(
 right_stats: Statistics,
 on: ,
 ) -> Option {
+let (left_col_stats, right_col_stats) = on
+.iter()
+.map(|(left, right)| {
+match (
+left.as_any().downcast_ref::(),
+right.as_any().downcast_ref::(),
+) {
+(Some(left), Some(right)) => (
+left_stats.column_statistics[left.index()].clone(),
+right_stats.column_statistics[right.index()].clone(),
+),
+_ => (
+ColumnStatistics::new_unknown(),
+ColumnStatistics::new_unknown(),
+),
+}
+})
+.unzip::<_, _, Vec<_>, Vec<_>>();
+
 match join_type {
 JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => 
{
-let (left_col_stats, right_col_stats) = on
-.iter()
-.map(|(left, right)| {
-match (
-left.as_any().downcast_ref::(),
-right.as_any().downcast_ref::(),
-) {
-(Some(left), Some(right)) => (
-left_stats.column_statistics[left.index()].clone(),
-
right_stats.column_statistics[right.index()].clone(),
-),
-_ => (
-ColumnStatistics::new_unknown(),
-ColumnStatistics::new_unknown(),
-),
-}
-})
-.unzip::<_, _, Vec<_>, Vec<_>>();
-
 let ij_cardinality = estimate_inner_join_cardinality(
 Statistics {
 num_rows: left_stats.num_rows.clone(),
@@ -888,10 +888,38 @@ fn estimate_join_cardinality(
 })
 }
 
-JoinType::LeftSemi
-| JoinType::RightSemi
-| JoinType::LeftAnti
-| JoinType::RightAnti => None,
+// For SemiJoins estimation result is either zero, in cases when inputs
+// are non-overlapping according to statistics, or equal to number of 
rows
+// for outer input
+JoinType::LeftSemi | JoinType::RightSemi => {
+let (outer_stats, inner_stats) = match join_type {
+JoinType::LeftSemi => (left_stats, right_stats),
+_ => (right_stats, left_stats),
+};
+let cardinality = match estimate_disjoint_inputs(_stats, 
_stats) {
+Some(estimation) => *estimation.get_value()?,
+None => *outer_stats.num_rows.get_value()?,
+};
+
+Some(PartialJoinStatistics {
+num_rows: cardinality,
+column_statistics: outer_stats.column_statistics,
+})
+}
+
+// For AntiJoins estimation always equals to outer statistics, as
+// non-overlapping inputs won't affect estimation
+JoinType::LeftAnti | JoinType::RightAnti => {
+let outer_stats = match join_type {
+JoinType::LeftAnti => left_stats,
+_ => right_stats,
+};
+
+Some(PartialJoinStatistics {
+num_rows: *outer_stats.num_rows.get_value()?,
+column_statistics: outer_stats.column_statistics,
+})
+}
 }
 }
 
@@ -903,6 +931,11 @@ fn estimate_inner_join_cardinality(
 left_stats: Statistics,
 right_stats: Statistics,
 ) -> Option> {
+// Immediatedly return if inputs considered as non-overlapping
+if let Some(estimation) = estimate_disjoint_inputs(_stats, 
_stats) {
+return Some(estimation);
+};
+
 // The algorithm here is partly based on th

(arrow-datafusion) branch main updated: Run TPC-H SF10 during PR benchmarks (#9822)

2024-03-27 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 7f4b338d6f Run TPC-H SF10 during PR benchmarks (#9822)
7f4b338d6f is described below

commit 7f4b338d6f7e4434529b87ca9eb273c7fb8819ec
Author: Marko Grujic 
AuthorDate: Wed Mar 27 22:52:58 2024 +0100

Run TPC-H SF10 during PR benchmarks (#9822)

* Run TPC-H SF10 during PR benchmarks

* Add memory benchmarks to the workflow

Also distinguish the output file by the SF used.
---
 .github/workflows/pr_benchmarks.yml | 11 +--
 benchmarks/bench.sh |  4 ++--
 2 files changed, 11 insertions(+), 4 deletions(-)

diff --git a/.github/workflows/pr_benchmarks.yml 
b/.github/workflows/pr_benchmarks.yml
index b7b85c9fcf..29d001783b 100644
--- a/.github/workflows/pr_benchmarks.yml
+++ b/.github/workflows/pr_benchmarks.yml
@@ -28,9 +28,10 @@ jobs:
   cd benchmarks
   mkdir data
   
-  # Setup the TPC-H data set with a scale factor of 10
+  # Setup the TPC-H data sets for scale factors 1 and 10
   ./bench.sh data tpch
-  
+  ./bench.sh data tpch10
+
   - name: Generate unique result names
 run: |
   echo "HEAD_LONG_SHA=$(git log -1 --format='%H')" >> "$GITHUB_ENV" 
@@ -44,6 +45,9 @@ jobs:
   cd benchmarks
 
   ./bench.sh run tpch
+  ./bench.sh run tpch_mem
+  ./bench.sh run tpch10
+  ./bench.sh run tpch_mem10
   
   # For some reason this step doesn't seem to propagate the env var 
down into the script
   if [ -d "results/HEAD" ]; then
@@ -64,6 +68,9 @@ jobs:
   cd benchmarks
 
   ./bench.sh run tpch
+  ./bench.sh run tpch_mem
+  ./bench.sh run tpch10
+  ./bench.sh run tpch_mem10
   
   echo ${{ github.event.issue.number }} > pr
   
diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh
index 039f4790ac..a724008927 100755
--- a/benchmarks/bench.sh
+++ b/benchmarks/bench.sh
@@ -314,7 +314,7 @@ run_tpch() {
 fi
 TPCH_DIR="${DATA_DIR}/tpch_sf${SCALE_FACTOR}"
 
-RESULTS_FILE="${RESULTS_DIR}/tpch.json"
+RESULTS_FILE="${RESULTS_DIR}/tpch_sf${SCALE_FACTOR}.json"
 echo "RESULTS_FILE: ${RESULTS_FILE}"
 echo "Running tpch benchmark..."
 $CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path 
"${TPCH_DIR}" --format parquet -o ${RESULTS_FILE}
@@ -329,7 +329,7 @@ run_tpch_mem() {
 fi
 TPCH_DIR="${DATA_DIR}/tpch_sf${SCALE_FACTOR}"
 
-RESULTS_FILE="${RESULTS_DIR}/tpch_mem.json"
+RESULTS_FILE="${RESULTS_DIR}/tpch_mem_sf${SCALE_FACTOR}.json"
 echo "RESULTS_FILE: ${RESULTS_FILE}"
 echo "Running tpch_mem benchmark..."
 # -m means in memory



(arrow-datafusion) branch main updated: Support IGNORE NULLS for NTH_VALUE window function (#9625)

2024-03-18 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 40bf0ea183 Support IGNORE NULLS for NTH_VALUE window function (#9625)
40bf0ea183 is described below

commit 40bf0ea183b4aa8ecab01f8816f0fb83328cd262
Author: Huaxin Gao 
AuthorDate: Mon Mar 18 05:20:36 2024 -0700

Support IGNORE NULLS for NTH_VALUE window function (#9625)

Co-authored-by: Huaxin Gao 
---
 datafusion/physical-expr/src/window/nth_value.rs |  32 ++--
 datafusion/physical-plan/src/windows/mod.rs  |   1 -
 datafusion/sqllogictest/test_files/window.slt| 182 +++
 3 files changed, 205 insertions(+), 10 deletions(-)

diff --git a/datafusion/physical-expr/src/window/nth_value.rs 
b/datafusion/physical-expr/src/window/nth_value.rs
index 5c7c891f92..e913f39333 100644
--- a/datafusion/physical-expr/src/window/nth_value.rs
+++ b/datafusion/physical-expr/src/window/nth_value.rs
@@ -83,19 +83,16 @@ impl NthValue {
 name: impl Into,
 expr: Arc,
 data_type: DataType,
-n: u32,
+n: i64,
 ignore_nulls: bool,
 ) -> Result {
-if ignore_nulls {
-return exec_err!("NTH_VALUE ignore_nulls is not supported yet");
-}
 match n {
 0 => exec_err!("NTH_VALUE expects n to be non-zero"),
 _ => Ok(Self {
 name: name.into(),
 expr,
 data_type,
-kind: NthValueKind::Nth(n as i64),
+kind: NthValueKind::Nth(n),
 ignore_nulls,
 }),
 }
@@ -267,20 +264,37 @@ impl PartitionEvaluator for NthValueEvaluator {
 if index >= n_range {
 // Outside the range, return NULL:
 ScalarValue::try_from(arr.data_type())
+} else if self.ignore_nulls {
+let valid_indices = valid_indices.unwrap();
+if index >= valid_indices.len() {
+return 
ScalarValue::try_from(arr.data_type());
+}
+ScalarValue::try_from_array(
+(),
+valid_indices[index],
+)
 } else {
 ScalarValue::try_from_array(arr, range.start + 
index)
 }
 }
 Ordering::Less => {
 let reverse_index = (-n) as usize;
-if n_range >= reverse_index {
+if n_range < reverse_index {
+// Outside the range, return NULL:
+ScalarValue::try_from(arr.data_type())
+} else if self.ignore_nulls {
+let valid_indices = valid_indices.unwrap();
+if reverse_index > valid_indices.len() {
+return 
ScalarValue::try_from(arr.data_type());
+}
+let new_index =
+valid_indices[valid_indices.len() - 
reverse_index];
+ScalarValue::try_from_array((), 
new_index)
+} else {
 ScalarValue::try_from_array(
 arr,
 range.start + n_range - reverse_index,
 )
-} else {
-// Outside the range, return NULL:
-ScalarValue::try_from(arr.data_type())
 }
 }
 Ordering::Equal => {
diff --git a/datafusion/physical-plan/src/windows/mod.rs 
b/datafusion/physical-plan/src/windows/mod.rs
index 6712bc855f..da2b24487d 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -249,7 +249,6 @@ fn create_built_in_window_expr(
 .clone()
 .try_into()
 .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
-let n: u32 = n as u32;
 Arc::new(NthValue::nth(
 name,
 arg,
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index efb180b10c..a309a97137 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllo

(arrow-datafusion) branch main updated (451d13a88a -> 6e90f01c41)

2024-03-16 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


from 451d13a88a Remove constant expressions from SortExprs in the SortExec 
(#9618)
 add 6e90f01c41 Try fixing missing results name in the benchmark step 
(#9632)

No new revisions were added by this update.

Summary of changes:
 .github/workflows/pr_benchmarks.yml | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)



(arrow-datafusion) branch main updated (3d130915b4 -> 451d13a88a)

2024-03-15 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


from 3d130915b4 Enable TTY during bench data gen (#9626)
 add 451d13a88a Remove constant expressions from SortExprs in the SortExec 
(#9618)

No new revisions were added by this update.

Summary of changes:
 .../src/physical_optimizer/enforce_distribution.rs | 35 +++-
 .../core/src/physical_optimizer/enforce_sorting.rs | 10 ++
 datafusion/core/src/physical_optimizer/utils.rs| 27 ++-
 .../physical-expr/src/equivalence/properties.rs|  2 +-
 datafusion/sqllogictest/test_files/window.slt  | 38 ++
 5 files changed, 72 insertions(+), 40 deletions(-)



(arrow-datafusion) branch main updated (00a122545d -> 3d130915b4)

2024-03-15 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


from 00a122545d Remove datafusion-functions-array dependency from 
datafusion-optimizer (#9621)
 add 3d130915b4 Enable TTY during bench data gen (#9626)

No new revisions were added by this update.

Summary of changes:
 .github/workflows/pr_benchmarks.yml | 2 ++
 benchmarks/README.md| 2 ++
 benchmarks/bench.sh | 4 ++--
 3 files changed, 6 insertions(+), 2 deletions(-)



(arrow-datafusion) branch main updated (3b61004a02 -> c2787c7d43)

2024-03-13 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


from 3b61004a02 Remove physical expr of NamedStructField, convert to 
`get_field` function call (#9563)
 add c2787c7d43 Add `/benchmark` github command to comparison benchmark 
between base and pr commit (#9461)

No new revisions were added by this update.

Summary of changes:
 .github/workflows/pr_benchmarks.yml | 87 +
 .github/workflows/pr_comment.yml| 53 ++
 benchmarks/bench.sh | 12 ++---
 3 files changed, 147 insertions(+), 5 deletions(-)
 create mode 100644 .github/workflows/pr_benchmarks.yml
 create mode 100644 .github/workflows/pr_comment.yml



(arrow-datafusion) branch main updated (78bb64e03d -> 0302d6530a)

2024-03-13 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


from 78bb64e03d Issue-9565 - Port ArrayRepeat to function-arrays subcrate 
(#9568)
 add 0302d6530a Fix ApproxPercentileAccumulator on zero values (#9582)

No new revisions were added by this update.

Summary of changes:
 datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs | 6 +++---
 datafusion/sqllogictest/test_files/aggregate.slt | 5 +++--
 2 files changed, 6 insertions(+), 5 deletions(-)



(arrow-datafusion) branch main updated (22585586bf -> 3050699580)

2024-02-25 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


from 22585586bf Substrait: Support ScalarUDF (#9337)
 add 3050699580 Make agg_func_substitute test deterministic (#9340)

No new revisions were added by this update.

Summary of changes:
 datafusion/sqllogictest/test_files/agg_func_substitute.slt | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)



(arrow-datafusion) branch main updated: Support CopyTo::partition_by in datafusion proto (#9306)

2024-02-25 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 148b4d22f1 Support CopyTo::partition_by in datafusion proto (#9306)
148b4d22f1 is described below

commit 148b4d22f1d6de2bc6269ab96ba2e48e4735a9f9
Author: Hoang Pham 
AuthorDate: Sun Feb 25 20:36:56 2024 +0700

Support CopyTo::partition_by in datafusion proto (#9306)

* add support for CopyTo::partition_by in proto

Signed-off-by: Hoang Pham 

* simplify partition_by logic

Signed-off-by: Hoang Pham 

-

Signed-off-by: Hoang Pham 
---
 datafusion/expr/src/logical_plan/dml.rs|  2 +-
 datafusion/proto/proto/datafusion.proto|  1 +
 datafusion/proto/src/generated/pbjson.rs   | 18 ++
 datafusion/proto/src/generated/prost.rs|  2 ++
 datafusion/proto/src/logical_plan/mod.rs   |  6 --
 datafusion/proto/tests/cases/roundtrip_logical_plan.rs | 11 +++
 6 files changed, 33 insertions(+), 7 deletions(-)

diff --git a/datafusion/expr/src/logical_plan/dml.rs 
b/datafusion/expr/src/logical_plan/dml.rs
index a55781eda6..7f04bd8973 100644
--- a/datafusion/expr/src/logical_plan/dml.rs
+++ b/datafusion/expr/src/logical_plan/dml.rs
@@ -36,7 +36,7 @@ pub struct CopyTo {
 pub output_url: String,
 /// The file format to output (explicitly defined or inferred from file 
extension)
 pub file_format: FileType,
-/// Detmines which, if any, columns should be used for hive-style 
partitioned writes
+/// Determines which, if any, columns should be used for hive-style 
partitioned writes
 pub partition_by: Vec,
 /// Arbitrary options as tuples
 pub copy_options: CopyOptions,
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index e779e29cb8..7673ce86ae 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -327,6 +327,7 @@ message CopyToNode {
 FileTypeWriterOptions writer_options = 5;
 }
 string file_type = 6;
+repeated string partition_by = 7;
 }
 
 message SQLOptions {
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index f5f15aa3e4..65483f9ac4 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -3795,6 +3795,9 @@ impl serde::Serialize for CopyToNode {
 if !self.file_type.is_empty() {
 len += 1;
 }
+if !self.partition_by.is_empty() {
+len += 1;
+}
 if self.copy_options.is_some() {
 len += 1;
 }
@@ -3808,6 +3811,9 @@ impl serde::Serialize for CopyToNode {
 if !self.file_type.is_empty() {
 struct_ser.serialize_field("fileType", _type)?;
 }
+if !self.partition_by.is_empty() {
+struct_ser.serialize_field("partitionBy", _by)?;
+}
 if let Some(v) = self.copy_options.as_ref() {
 match v {
 copy_to_node::CopyOptions::SqlOptions(v) => {
@@ -3833,6 +3839,8 @@ impl<'de> serde::Deserialize<'de> for CopyToNode {
 "outputUrl",
 "file_type",
 "fileType",
+"partition_by",
+"partitionBy",
 "sql_options",
 "sqlOptions",
 "writer_options",
@@ -3844,6 +3852,7 @@ impl<'de> serde::Deserialize<'de> for CopyToNode {
 Input,
 OutputUrl,
 FileType,
+PartitionBy,
 SqlOptions,
 WriterOptions,
 }
@@ -3870,6 +3879,7 @@ impl<'de> serde::Deserialize<'de> for CopyToNode {
 "input" => Ok(GeneratedField::Input),
 "outputUrl" | "output_url" => 
Ok(GeneratedField::OutputUrl),
 "fileType" | "file_type" => 
Ok(GeneratedField::FileType),
+"partitionBy" | "partition_by" => 
Ok(GeneratedField::PartitionBy),
 "sqlOptions" | "sql_options" => 
Ok(GeneratedField::SqlOptions),
 "writerOptions" | "writer_options" => 
Ok(GeneratedField::WriterOptions),
 _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
@@ -3894,6 +3904,7 @@ impl<'de> serde::Deserialize<'de> for CopyToNode {
 let mut input__ = None;
 let mut output_url__ = None;
 

(arrow-datafusion) branch main updated: chore: statically link xz2 (#9311)

2024-02-24 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 73c08b5045 chore: statically link xz2 (#9311)
73c08b5045 is described below

commit 73c08b5045fc627c3ed81d1bcc5234a7436a8e38
Author: universalmind303 
AuthorDate: Sat Feb 24 10:25:04 2024 -0600

chore: statically link xz2 (#9311)

* chore: statically link xz2

* toml fmt
---
 datafusion/core/Cargo.toml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 874e455132..506be6667e 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -116,7 +116,7 @@ tokio = { workspace = true }
 tokio-util = { version = "0.7.4", features = ["io"], optional = true }
 url = { workspace = true }
 uuid = { version = "1.0", features = ["v4"] }
-xz2 = { version = "0.1", optional = true }
+xz2 = { version = "0.1", optional = true, features = ["static"] }
 zstd = { version = "0.13", optional = true, default-features = false }
 
 [dev-dependencies]



(arrow-datafusion) branch main updated (91f3eb2e54 -> 02c948d91b)

2024-02-22 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


from 91f3eb2e54 docs: update contributor guide (format toml/inte test) 
(#9301)
 add 02c948d91b [MINOR]: Limit stream replace with slice (#9303)

No new revisions were added by this update.

Summary of changes:
 datafusion/physical-plan/src/limit.rs | 25 +++--
 1 file changed, 7 insertions(+), 18 deletions(-)



(arrow-datafusion) branch main updated: Improve datafusion-cli print format tests (#8896)

2024-01-19 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 3a9e23d138 Improve datafusion-cli print format tests (#8896)
3a9e23d138 is described below

commit 3a9e23d138c935ffea68408899016c9323aa0f36
Author: Andrew Lamb 
AuthorDate: Fri Jan 19 04:56:58 2024 -0500

Improve datafusion-cli print format tests (#8896)
---
 datafusion-cli/src/print_format.rs | 415 +
 1 file changed, 283 insertions(+), 132 deletions(-)

diff --git a/datafusion-cli/src/print_format.rs 
b/datafusion-cli/src/print_format.rs
index 0a8c7b4b3e..2de52be612 100644
--- a/datafusion-cli/src/print_format.rs
+++ b/datafusion-cli/src/print_format.rs
@@ -190,117 +190,212 @@ impl PrintFormat {
 
 #[cfg(test)]
 mod tests {
-use std::io::{Cursor, Read, Write};
-use std::sync::Arc;
-
 use super::*;
+use std::sync::Arc;
 
 use arrow::array::{ArrayRef, Int32Array};
 use arrow::datatypes::{DataType, Field, Schema};
-use datafusion::error::Result;
-
-fn run_test(batches: &[RecordBatch], test_fn: F) -> Result
-where
-F: Fn( Cursor>, &[RecordBatch]) -> Result<()>,
-{
-let mut buffer = Cursor::new(Vec::new());
-test_fn( buffer, batches)?;
-buffer.set_position(0);
-let mut contents = String::new();
-buffer.read_to_string( contents)?;
-Ok(contents)
+
+#[test]
+fn print_empty() {
+for format in [
+PrintFormat::Csv,
+PrintFormat::Tsv,
+PrintFormat::Table,
+PrintFormat::Json,
+PrintFormat::NdJson,
+PrintFormat::Automatic,
+] {
+// no output for empty batches, even with header set
+PrintBatchesTest::new()
+.with_format(format)
+.with_batches(vec![])
+.with_expected(&[""])
+.run();
+}
 }
 
 #[test]
-fn test_print_batches_with_sep() -> Result<()> {
-let contents = run_test(&[], |buffer, batches| {
-print_batches_with_sep(buffer, batches, b',', true)
-})?;
-assert_eq!(contents, "");
+fn print_csv_no_header() {
+#[rustfmt::skip]
+let expected = &[
+"1,4,7",
+"2,5,8",
+"3,6,9",
+];
 
-let schema = Arc::new(Schema::new(vec![
-Field::new("a", DataType::Int32, false),
-Field::new("b", DataType::Int32, false),
-Field::new("c", DataType::Int32, false),
-]));
-let batch = RecordBatch::try_new(
-schema,
-vec![
-Arc::new(Int32Array::from(vec![1, 2, 3])),
-Arc::new(Int32Array::from(vec![4, 5, 6])),
-Arc::new(Int32Array::from(vec![7, 8, 9])),
-],
-)?;
+PrintBatchesTest::new()
+.with_format(PrintFormat::Csv)
+.with_batches(split_batch(three_column_batch()))
+.with_header(WithHeader::No)
+.with_expected(expected)
+.run();
+}
 
-let contents = run_test(&[batch], |buffer, batches| {
-print_batches_with_sep(buffer, batches, b',', true)
-})?;
-assert_eq!(contents, "a,b,c\n1,4,7\n2,5,8\n3,6,9\n");
+#[test]
+fn print_csv_with_header() {
+#[rustfmt::skip]
+let expected = &[
+"a,b,c",
+"1,4,7",
+"2,5,8",
+"3,6,9",
+];
 
-Ok(())
+PrintBatchesTest::new()
+.with_format(PrintFormat::Csv)
+.with_batches(split_batch(three_column_batch()))
+.with_header(WithHeader::Yes)
+.with_expected(expected)
+.run();
 }
 
 #[test]
-fn test_print_batches_to_json_empty() -> Result<()> {
-let contents = run_test(&[], |buffer, batches| {
-batches_to_json!(ArrayWriter, buffer, batches)
-})?;
-assert_eq!(contents, "");
+fn print_tsv_no_header() {
+#[rustfmt::skip]
+let expected = &[
+"1\t4\t7",
+"2\t5\t8",
+"3\t6\t9",
+];
 
-let contents = run_test(&[], |buffer, batches| {
-batches_to_json!(LineDelimitedWriter, buffer, batches)
-})?;
-assert_eq!(contents, "");
+PrintBatchesTest::new()
+.with_format(PrintFormat::Tsv)
+.with_batches(split_batch(three_column_batch()))
+.with_header(WithHeader::No)
+.with_expe

(arrow-datafusion) branch main updated (4cde998f1e -> 08de64d377)

2024-01-16 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


from 4cde998f1e fix: don't extract common sub expr in `CASE WHEN` clause 
(#8833)
 add 08de64d377 Add "Extended" clickbench queries (#8861)

No new revisions were added by this update.

Summary of changes:
 benchmarks/bench.sh| 17 
 benchmarks/queries/clickbench/README.md| 33 +++
 benchmarks/queries/clickbench/README.txt   |  1 -
 benchmarks/queries/clickbench/extended.sql |  1 +
 benchmarks/src/clickbench.rs   | 66 +++---
 5 files changed, 92 insertions(+), 26 deletions(-)
 create mode 100644 benchmarks/queries/clickbench/README.md
 delete mode 100644 benchmarks/queries/clickbench/README.txt
 create mode 100644 benchmarks/queries/clickbench/extended.sql



(arrow-datafusion) branch main updated: Remove need for sort in new_with_metadata (#8855)

2024-01-15 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new ff728d6c75 Remove need for sort in new_with_metadata (#8855)
ff728d6c75 is described below

commit ff728d6c75eb0eef048d1b2f61a73bf750d2814e
Author: Simon Vandel Sillesen 
AuthorDate: Mon Jan 15 08:21:38 2024 +

Remove need for sort in new_with_metadata (#8855)

BTreeMap gives stable iteration order, so we don't need to sort

Speeds up benchmarks in sql_planner.rs by 3-8%
---
 datafusion/common/src/dfschema.rs | 16 +---
 1 file changed, 5 insertions(+), 11 deletions(-)

diff --git a/datafusion/common/src/dfschema.rs 
b/datafusion/common/src/dfschema.rs
index 85b97aac03..c715fad112 100644
--- a/datafusion/common/src/dfschema.rs
+++ b/datafusion/common/src/dfschema.rs
@@ -18,7 +18,7 @@
 //! DFSchema is an extended schema struct that DataFusion uses to provide 
support for
 //! fields with optional relation names.
 
-use std::collections::{HashMap, HashSet};
+use std::collections::{BTreeSet, HashMap};
 use std::convert::TryFrom;
 use std::fmt::{Display, Formatter};
 use std::hash::Hash;
@@ -135,8 +135,8 @@ impl DFSchema {
 fields: Vec,
 metadata: HashMap,
 ) -> Result {
-let mut qualified_names = HashSet::new();
-let mut unqualified_names = HashSet::new();
+let mut qualified_names = BTreeSet::new();
+let mut unqualified_names = BTreeSet::new();
 
 for field in  {
 if let Some(qualifier) = field.qualifier() {
@@ -148,14 +148,8 @@ impl DFSchema {
 }
 }
 
-// check for mix of qualified and unqualified field with same 
unqualified name
-// note that we need to sort the contents of the HashSet first so that 
errors are
-// deterministic
-let mut qualified_names = qualified_names
-.iter()
-.map(|(l, r)| (l.to_owned(), r.to_owned()))
-.collect::>();
-qualified_names.sort();
+// Check for mix of qualified and unqualified fields with same 
unqualified name.
+// The BTreeSet storage makes sure that errors are reported in 
deterministic order.
 for (qualifier, name) in _names {
 if unqualified_names.contains(name) {
 return _schema_err!(SchemaError::AmbiguousReference {



(arrow-datafusion) branch main updated: Use standard tree walk in Projection Pushdown (#8787)

2024-01-09 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new f5ca078f96 Use standard tree walk in Projection Pushdown (#8787)
f5ca078f96 is described below

commit f5ca078f964b2a19db41e85640b1461508d4b8a6
Author: Andrew Lamb 
AuthorDate: Tue Jan 9 07:02:40 2024 -0500

Use standard tree walk in Projection Pushdown (#8787)
---
 datafusion/optimizer/src/optimize_projections.rs | 147 +--
 1 file changed, 29 insertions(+), 118 deletions(-)

diff --git a/datafusion/optimizer/src/optimize_projections.rs 
b/datafusion/optimizer/src/optimize_projections.rs
index 1d4eda0bd2..d9c4551097 100644
--- a/datafusion/optimizer/src/optimize_projections.rs
+++ b/datafusion/optimizer/src/optimize_projections.rs
@@ -36,9 +36,10 @@ use datafusion_common::{
 use datafusion_expr::expr::{Alias, ScalarFunction, ScalarFunctionDefinition};
 use datafusion_expr::{
 logical_plan::LogicalPlan, projection_schema, Aggregate, BinaryExpr, Cast, 
Distinct,
-Expr, GroupingSet, Projection, TableScan, Window,
+Expr, Projection, TableScan, Window,
 };
 
+use datafusion_expr::utils::inspect_expr_pre;
 use hashbrown::HashMap;
 use itertools::{izip, Itertools};
 
@@ -531,7 +532,7 @@ macro_rules! rewrite_expr_with_check {
 ///
 /// - `Ok(Some(Expr))`: Rewrite was successful. Contains the rewritten result.
 /// - `Ok(None)`: Signals that `expr` can not be rewritten.
-/// - `Err(error)`: An error occured during the function call.
+/// - `Err(error)`: An error occurred during the function call.
 fn rewrite_expr(expr: , input: ) -> Result> {
 let result = match expr {
 Expr::Column(col) => {
@@ -574,23 +575,7 @@ fn rewrite_expr(expr: , input: ) -> 
Result> {
 Ok(Some(result))
 }
 
-/// Retrieves a set of outer-referenced columns by the given expression, 
`expr`.
-/// Note that the `Expr::to_columns()` function doesn't return these columns.
-///
-/// # Parameters
-///
-/// * `expr` - The expression to analyze for outer-referenced columns.
-///
-/// # Returns
-///
-/// returns a `HashSet` containing all outer-referenced columns.
-fn outer_columns(expr: ) -> HashSet {
-let mut columns = HashSet::new();
-outer_columns_helper(expr,  columns);
-columns
-}
-
-/// A recursive subroutine that accumulates outer-referenced columns by the
+/// Accumulates outer-referenced columns by the
 /// given expression, `expr`.
 ///
 /// # Parameters
@@ -598,105 +583,31 @@ fn outer_columns(expr: ) -> HashSet {
 /// * `expr` - The expression to analyze for outer-referenced columns.
 /// * `columns` - A mutable reference to a `HashSet` where detected
 ///   columns are collected.
-fn outer_columns_helper(expr: , columns:  HashSet) {
-match expr {
-Expr::OuterReferenceColumn(_, col) => {
-columns.insert(col.clone());
-}
-Expr::BinaryExpr(binary_expr) => {
-outer_columns_helper(_expr.left, columns);
-outer_columns_helper(_expr.right, columns);
-}
-Expr::ScalarSubquery(subquery) => {
-let exprs = subquery.outer_ref_columns.iter();
-outer_columns_helper_multi(exprs, columns);
-}
-Expr::Exists(exists) => {
-let exprs = exists.subquery.outer_ref_columns.iter();
-outer_columns_helper_multi(exprs, columns);
-}
-Expr::Alias(alias) => outer_columns_helper(, columns),
-Expr::InSubquery(insubquery) => {
-let exprs = insubquery.subquery.outer_ref_columns.iter();
-outer_columns_helper_multi(exprs, columns);
-}
-Expr::Cast(cast) => outer_columns_helper(, columns),
-Expr::Sort(sort) => outer_columns_helper(, columns),
-Expr::AggregateFunction(aggregate_fn) => {
-outer_columns_helper_multi(aggregate_fn.args.iter(), columns);
-if let Some(filter) = aggregate_fn.filter.as_ref() {
-outer_columns_helper(filter, columns);
+fn outer_columns(expr: , columns:  HashSet) {
+// inspect_expr_pre doesn't handle subquery references, so find them 
explicitly
+inspect_expr_pre(expr, |expr| {
+match expr {
+Expr::OuterReferenceColumn(_, col) => {
+columns.insert(col.clone());
 }
-if let Some(obs) = aggregate_fn.order_by.as_ref() {
-outer_columns_helper_multi(obs.iter(), columns);
+Expr::ScalarSubquery(subquery) => {
+outer_columns_helper_multi(_ref_columns, 
columns);
 }
-}
-Expr::WindowFunction(window_fn) => {
-outer_columns_helper_multi(window_fn.args.iter(), columns);
-outer_columns_helper_multi(window_fn.order_by.iter(), columns);
-outer_co

(arrow-datafusion) branch main updated: feat: native types in `DistinctCountAccumulator` for primitive types (#8721)

2024-01-05 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 561d941854 feat: native types in `DistinctCountAccumulator` for 
primitive types (#8721)
561d941854 is described below

commit 561d941854e96bed2de3c7a4c6de9afab622a08b
Author: Eduard Karacharov <13005055+kor...@users.noreply.github.com>
AuthorDate: Fri Jan 5 11:32:17 2024 +0200

feat: native types in `DistinctCountAccumulator` for primitive types (#8721)

* DistinctCountGroupsAccumulator

* test coverage

* clippy warnings

* count distinct for primitive types

* revert hashset to std

* fixed accumulator size estimation
---
 .../physical-expr/src/aggregate/count_distinct.rs  | 298 -
 .../physical-expr/src/aggregate/sum_distinct.rs|  22 +-
 datafusion/physical-expr/src/aggregate/utils.rs|  20 +-
 3 files changed, 311 insertions(+), 29 deletions(-)

diff --git a/datafusion/physical-expr/src/aggregate/count_distinct.rs 
b/datafusion/physical-expr/src/aggregate/count_distinct.rs
index c2fd32a96c..f7c13948b2 100644
--- a/datafusion/physical-expr/src/aggregate/count_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/count_distinct.rs
@@ -15,21 +15,32 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow::datatypes::{DataType, Field};
+use arrow::datatypes::{DataType, Field, TimeUnit};
+use arrow_array::types::{
+ArrowPrimitiveType, Date32Type, Date64Type, Decimal128Type, Decimal256Type,
+Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, 
Int8Type,
+Time32MillisecondType, Time32SecondType, Time64MicrosecondType, 
Time64NanosecondType,
+TimestampMicrosecondType, TimestampMillisecondType, 
TimestampNanosecondType,
+TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
+};
+use arrow_array::PrimitiveArray;
 
 use std::any::Any;
+use std::cmp::Eq;
 use std::fmt::Debug;
+use std::hash::Hash;
 use std::sync::Arc;
 
 use ahash::RandomState;
 use arrow::array::{Array, ArrayRef};
 use std::collections::HashSet;
 
-use crate::aggregate::utils::down_cast_any_ref;
+use crate::aggregate::utils::{down_cast_any_ref, Hashable};
 use crate::expressions::format_state_name;
 use crate::{AggregateExpr, PhysicalExpr};
-use datafusion_common::Result;
-use datafusion_common::ScalarValue;
+use datafusion_common::cast::{as_list_array, as_primitive_array};
+use datafusion_common::utils::array_into_list_array;
+use datafusion_common::{Result, ScalarValue};
 use datafusion_expr::Accumulator;
 
 type DistinctScalarValues = ScalarValue;
@@ -60,6 +71,18 @@ impl DistinctCount {
 }
 }
 
+macro_rules! native_distinct_count_accumulator {
+($TYPE:ident) => {{
+Ok(Box::new(NativeDistinctCountAccumulator::<$TYPE>::new()))
+}};
+}
+
+macro_rules! float_distinct_count_accumulator {
+($TYPE:ident) => {{
+Ok(Box::new(FloatDistinctCountAccumulator::<$TYPE>::new()))
+}};
+}
+
 impl AggregateExpr for DistinctCount {
 /// Return a reference to Any that can be used for downcasting
 fn as_any() ->  Any {
@@ -83,10 +106,57 @@ impl AggregateExpr for DistinctCount {
 }
 
 fn create_accumulator() -> Result> {
-Ok(Box::new(DistinctCountAccumulator {
-values: HashSet::default(),
-state_data_type: self.state_data_type.clone(),
-}))
+use DataType::*;
+use TimeUnit::*;
+
+match _data_type {
+Int8 => native_distinct_count_accumulator!(Int8Type),
+Int16 => native_distinct_count_accumulator!(Int16Type),
+Int32 => native_distinct_count_accumulator!(Int32Type),
+Int64 => native_distinct_count_accumulator!(Int64Type),
+UInt8 => native_distinct_count_accumulator!(UInt8Type),
+UInt16 => native_distinct_count_accumulator!(UInt16Type),
+UInt32 => native_distinct_count_accumulator!(UInt32Type),
+UInt64 => native_distinct_count_accumulator!(UInt64Type),
+Decimal128(_, _) => 
native_distinct_count_accumulator!(Decimal128Type),
+Decimal256(_, _) => 
native_distinct_count_accumulator!(Decimal256Type),
+
+Date32 => native_distinct_count_accumulator!(Date32Type),
+Date64 => native_distinct_count_accumulator!(Date64Type),
+Time32(Millisecond) => {
+native_distinct_count_accumulator!(Time32MillisecondType)
+}
+Time32(Second) => {
+native_distinct_count_accumulator!(Time32SecondType)
+}
+Time64(Microsecond) => {
+native_distinct_count_accumulator!(Time64MicrosecondType)
+}
+

(arrow-rs) branch master updated: Improve regexp kernels performance by avoiding cloning Regex (#5235)

2023-12-23 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
 new 72c9505bed Improve regexp kernels performance by avoiding cloning 
Regex (#5235)
72c9505bed is described below

commit 72c9505bed6260865fa95a637f37219cdc129a0e
Author: Liang-Chi Hsieh 
AuthorDate: Sat Dec 23 02:09:55 2023 -0800

Improve regexp kernels performance by avoiding cloning Regex (#5235)

* Improve regexp_match performance by avoiding cloning Regex

* For review
---
 arrow-string/src/regexp.rs  | 10 --
 arrow/Cargo.toml|  5 +
 arrow/benches/regexp_kernels.rs | 44 +
 3 files changed, 53 insertions(+), 6 deletions(-)

diff --git a/arrow-string/src/regexp.rs b/arrow-string/src/regexp.rs
index 34bb1b0b4c..25c712d20f 100644
--- a/arrow-string/src/regexp.rs
+++ b/arrow-string/src/regexp.rs
@@ -81,15 +81,14 @@ pub fn regexp_is_match_utf8(
 (Some(value), Some(pattern)) => {
 let existing_pattern = patterns.get();
 let re = match existing_pattern {
-Some(re) => re.clone(),
+Some(re) => re,
 None => {
 let re = Regex::new(pattern.as_str()).map_err(|e| {
 ArrowError::ComputeError(format!(
 "Regular expression did not compile: {e:?}"
 ))
 })?;
-patterns.insert(pattern, re.clone());
-re
+patterns.entry(pattern).or_insert(re)
 }
 };
 result.append(re.is_match(value));
@@ -216,15 +215,14 @@ pub fn regexp_match(
 (Some(value), Some(pattern)) => {
 let existing_pattern = patterns.get();
 let re = match existing_pattern {
-Some(re) => re.clone(),
+Some(re) => re,
 None => {
 let re = Regex::new(pattern.as_str()).map_err(|e| {
 ArrowError::ComputeError(format!(
 "Regular expression did not compile: {e:?}"
 ))
 })?;
-patterns.insert(pattern, re.clone());
-re
+patterns.entry(pattern).or_insert(re)
 }
 };
 match re.captures(value) {
diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml
index a6b4ddf51d..168a58b295 100644
--- a/arrow/Cargo.toml
+++ b/arrow/Cargo.toml
@@ -247,6 +247,11 @@ name = "substring_kernels"
 harness = false
 required-features = ["test_utils"]
 
+[[bench]]
+name = "regexp_kernels"
+harness = false
+required-features = ["test_utils"]
+
 [[bench]]
 name = "array_data_validate"
 harness = false
diff --git a/arrow/benches/regexp_kernels.rs b/arrow/benches/regexp_kernels.rs
new file mode 100644
index 00..eb38ba6783
--- /dev/null
+++ b/arrow/benches/regexp_kernels.rs
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[macro_use]
+extern crate criterion;
+use criterion::Criterion;
+
+extern crate arrow;
+
+use arrow::array::*;
+use arrow::compute::kernels::regexp::*;
+use arrow::util::bench_util::*;
+
+fn bench_regexp(arr: , regex_array: 
) {
+regexp_match(criterion::black_box(arr), regex_array, None).unwrap();
+}
+
+fn add_benchmark(c:  Criterion) {
+let size = 65536;
+let val_len = 1000;
+
+let arr_string = create_string_array_with_len::(size, 0.0, val_len);
+let pattern_values = vec![r".*-(\d*)-.*"; size];
+let pattern = GenericStringArrayfrom(pattern_values);
+
+c.bench_function("regexp", |b| b.iter(|| bench_regexp(_string, 
)));
+}
+
+criterion_group!(benches, add_benchmark);
+criterion_main!(benches);



(arrow-datafusion) branch main updated: Fix InListExpr to return the correct number of rows (#8601)

2023-12-22 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 55121d8e48 Fix InListExpr  to return the correct number of rows (#8601)
55121d8e48 is described below

commit 55121d8e48d99178a72a5dbaa773f1fbf4a2e059
Author: Andrew Lamb 
AuthorDate: Fri Dec 22 06:15:13 2023 -0500

Fix InListExpr  to return the correct number of rows (#8601)

* Fix InListExpr  to return the correct number of rows

* Reduce repetition
---
 .../physical-expr/src/expressions/in_list.rs   | 57 --
 1 file changed, 53 insertions(+), 4 deletions(-)

diff --git a/datafusion/physical-expr/src/expressions/in_list.rs 
b/datafusion/physical-expr/src/expressions/in_list.rs
index 625b01ec9a..1a1634081c 100644
--- a/datafusion/physical-expr/src/expressions/in_list.rs
+++ b/datafusion/physical-expr/src/expressions/in_list.rs
@@ -349,17 +349,18 @@ impl PhysicalExpr for InListExpr {
 }
 
 fn evaluate(, batch: ) -> Result {
+let num_rows = batch.num_rows();
 let value = self.expr.evaluate(batch)?;
 let r = match _filter {
-Some(f) => f.contains(value.into_array(1)?.as_ref(), 
self.negated)?,
+Some(f) => f.contains(value.into_array(num_rows)?.as_ref(), 
self.negated)?,
 None => {
-let value = value.into_array(batch.num_rows())?;
+let value = value.into_array(num_rows)?;
 let found = self.list.iter().map(|expr| 
expr.evaluate(batch)).try_fold(
-
BooleanArray::new(BooleanBuffer::new_unset(batch.num_rows()), None),
+BooleanArray::new(BooleanBuffer::new_unset(num_rows), 
None),
 |result, expr| -> Result {
 Ok(or_kleene(
 ,
-(, ?.into_array(batch.num_rows())?)?,
+(, ?.into_array(num_rows)?)?,
 )?)
 },
 )?;
@@ -1267,4 +1268,52 @@ mod tests {
 
 Ok(())
 }
+
+#[test]
+fn in_list_no_cols() -> Result<()> {
+// test logic when the in_list expression doesn't have any columns
+let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
+let a = Int32Array::from(vec![Some(1), Some(2), None]);
+let batch = RecordBatch::try_new(Arc::new(schema.clone()), 
vec![Arc::new(a)])?;
+
+let list = vec![lit(ScalarValue::from(1i32)), 
lit(ScalarValue::from(6i32))];
+
+// 1 IN (1, 6)
+let expr = lit(ScalarValue::Int32(Some(1)));
+in_list!(
+batch,
+list.clone(),
+,
+// should have three outputs, as the input batch has three rows
+vec![Some(true), Some(true), Some(true)],
+expr,
+
+);
+
+// 2 IN (1, 6)
+let expr = lit(ScalarValue::Int32(Some(2)));
+in_list!(
+batch,
+list.clone(),
+,
+// should have three outputs, as the input batch has three rows
+vec![Some(false), Some(false), Some(false)],
+expr,
+
+);
+
+// NULL IN (1, 6)
+let expr = lit(ScalarValue::Int32(None));
+in_list!(
+batch,
+list.clone(),
+,
+// should have three outputs, as the input batch has three rows
+vec![None, None, None],
+expr,
+
+);
+
+Ok(())
+}
 }



(arrow-datafusion) branch main updated: Implement logical plan serde for CopyTo (#8618)

2023-12-22 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 0ff5305db6 Implement logical plan serde for CopyTo (#8618)
0ff5305db6 is described below

commit 0ff5305db6b03128282d31afac69fa727e1fe7c4
Author: Andy Grove 
AuthorDate: Fri Dec 22 04:14:45 2023 -0700

Implement logical plan serde for CopyTo (#8618)

* Implement logical plan serde for CopyTo

* add link to issue

* clippy

* remove debug logging
---
 datafusion/proto/proto/datafusion.proto|  21 ++
 datafusion/proto/src/generated/pbjson.rs   | 395 +
 datafusion/proto/src/generated/prost.rs|  43 ++-
 datafusion/proto/src/logical_plan/mod.rs   |  86 -
 .../proto/tests/cases/roundtrip_logical_plan.rs|  68 +++-
 5 files changed, 603 insertions(+), 10 deletions(-)

diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index cc802ee957..05f0b64343 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -74,6 +74,7 @@ message LogicalPlanNode {
 PrepareNode prepare = 26;
 DropViewNode drop_view = 27;
 DistinctOnNode distinct_on = 28;
+CopyToNode copy_to = 29;
   }
 }
 
@@ -317,6 +318,26 @@ message DistinctOnNode {
   LogicalPlanNode input = 4;
 }
 
+message CopyToNode {
+LogicalPlanNode input = 1;
+string output_url = 2;
+bool single_file_output = 3;
+oneof CopyOptions {
+SQLOptions sql_options = 4;
+FileTypeWriterOptions writer_options = 5;
+}
+string file_type = 6;
+}
+
+message SQLOptions {
+  repeated SQLOption option = 1;
+}
+
+message SQLOption {
+string key = 1;
+string value = 2;
+}
+
 message UnionNode {
   repeated LogicalPlanNode inputs = 1;
 }
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index fb3a3ad91d..0fdeab0a40 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -3704,6 +3704,188 @@ impl<'de> serde::Deserialize<'de> for Constraints {
 deserializer.deserialize_struct("datafusion.Constraints", FIELDS, 
GeneratedVisitor)
 }
 }
+impl serde::Serialize for CopyToNode {
+#[allow(deprecated)]
+fn serialize(, serializer: S) -> std::result::Result
+where
+S: serde::Serializer,
+{
+use serde::ser::SerializeStruct;
+let mut len = 0;
+if self.input.is_some() {
+len += 1;
+}
+if !self.output_url.is_empty() {
+len += 1;
+}
+if self.single_file_output {
+len += 1;
+}
+if !self.file_type.is_empty() {
+len += 1;
+}
+if self.copy_options.is_some() {
+len += 1;
+}
+let mut struct_ser = 
serializer.serialize_struct("datafusion.CopyToNode", len)?;
+if let Some(v) = self.input.as_ref() {
+struct_ser.serialize_field("input", v)?;
+}
+if !self.output_url.is_empty() {
+struct_ser.serialize_field("outputUrl", _url)?;
+}
+if self.single_file_output {
+struct_ser.serialize_field("singleFileOutput", 
_file_output)?;
+}
+if !self.file_type.is_empty() {
+struct_ser.serialize_field("fileType", _type)?;
+}
+if let Some(v) = self.copy_options.as_ref() {
+match v {
+copy_to_node::CopyOptions::SqlOptions(v) => {
+struct_ser.serialize_field("sqlOptions", v)?;
+}
+copy_to_node::CopyOptions::WriterOptions(v) => {
+struct_ser.serialize_field("writerOptions", v)?;
+}
+}
+}
+struct_ser.end()
+}
+}
+impl<'de> serde::Deserialize<'de> for CopyToNode {
+#[allow(deprecated)]
+fn deserialize(deserializer: D) -> std::result::Result
+where
+D: serde::Deserializer<'de>,
+{
+const FIELDS: &[] = &[
+"input",
+"output_url",
+"outputUrl",
+"single_file_output",
+"singleFileOutput",
+"file_type",
+"fileType",
+"sql_options",
+"sqlOptions",
+"writer_options",
+"writerOptions",
+];
+
+#[allow(clippy::enum_variant_names)]
+enum GeneratedField {
+Input,
+OutputUrl,
+SingleFileOutput,
+FileType,
+SqlOpt

(arrow-datafusion) branch main updated: refactor: `HashJoinStream` state machine (#8538)

2023-12-18 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new a71a76a996 refactor: `HashJoinStream` state machine (#8538)
a71a76a996 is described below

commit a71a76a996a32a0f068370940ebe475ec237b4ff
Author: Eduard Karacharov <13005055+kor...@users.noreply.github.com>
AuthorDate: Mon Dec 18 11:53:26 2023 +0200

refactor: `HashJoinStream` state machine (#8538)

* hash join state machine

* StreamJoinStateResult to StatefulStreamResult

* doc comments & naming & fmt

* suggestions from code review

Co-authored-by: Andrew Lamb 

* more review comments addressed

* post-merge fixes

-

Co-authored-by: Andrew Lamb 
---
 datafusion/physical-plan/src/joins/hash_join.rs| 431 ++---
 .../physical-plan/src/joins/stream_join_utils.rs   | 127 ++
 .../physical-plan/src/joins/symmetric_hash_join.rs |  25 +-
 datafusion/physical-plan/src/joins/utils.rs|  83 
 4 files changed, 420 insertions(+), 246 deletions(-)

diff --git a/datafusion/physical-plan/src/joins/hash_join.rs 
b/datafusion/physical-plan/src/joins/hash_join.rs
index 4846d0a5e0..13ac06ee30 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -28,7 +28,6 @@ use crate::joins::utils::{
 calculate_join_output_ordering, get_final_indices_from_bit_map,
 need_produce_result_in_final, JoinHashMap, JoinHashMapType,
 };
-use crate::DisplayAs;
 use crate::{
 coalesce_batches::concat_batches,
 coalesce_partitions::CoalescePartitionsExec,
@@ -38,12 +37,13 @@ use crate::{
 joins::utils::{
 adjust_right_output_partitioning, build_join_schema, 
check_join_is_valid,
 estimate_join_statistics, partitioned_join_output_partitioning,
-BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinOn,
+BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinOn, 
StatefulStreamResult,
 },
 metrics::{ExecutionPlanMetricsSet, MetricsSet},
 DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr,
 RecordBatchStream, SendableRecordBatchStream, Statistics,
 };
+use crate::{handle_state, DisplayAs};
 
 use super::{
 utils::{OnceAsync, OnceFut},
@@ -618,15 +618,14 @@ impl ExecutionPlan for HashJoinExec {
 on_right,
 filter: self.filter.clone(),
 join_type: self.join_type,
-left_fut,
-visited_left_side: None,
 right: right_stream,
 column_indices: self.column_indices.clone(),
 random_state: self.random_state.clone(),
 join_metrics,
 null_equals_null: self.null_equals_null,
-is_exhausted: false,
 reservation,
+state: HashJoinStreamState::WaitBuildSide,
+build_side: BuildSide::Initial(BuildSideInitialState { left_fut }),
 }))
 }
 
@@ -789,6 +788,104 @@ where
 Ok(())
 }
 
+/// Represents build-side of hash join.
+enum BuildSide {
+/// Indicates that build-side not collected yet
+Initial(BuildSideInitialState),
+/// Indicates that build-side data has been collected
+Ready(BuildSideReadyState),
+}
+
+/// Container for BuildSide::Initial related data
+struct BuildSideInitialState {
+/// Future for building hash table from build-side input
+left_fut: OnceFut,
+}
+
+/// Container for BuildSide::Ready related data
+struct BuildSideReadyState {
+/// Collected build-side data
+left_data: Arc,
+/// Which build-side rows have been matched while creating output.
+/// For some OUTER joins, we need to know which rows have not been matched
+/// to produce the correct output.
+visited_left_side: BooleanBufferBuilder,
+}
+
+impl BuildSide {
+/// Tries to extract BuildSideInitialState from BuildSide enum.
+/// Returns an error if state is not Initial.
+fn try_as_initial_mut( self) -> Result< BuildSideInitialState> {
+match self {
+BuildSide::Initial(state) => Ok(state),
+_ => internal_err!("Expected build side in initial state"),
+}
+}
+
+/// Tries to extract BuildSideReadyState from BuildSide enum.
+/// Returns an error if state is not Ready.
+fn try_as_ready() -> Result<> {
+match self {
+BuildSide::Ready(state) => Ok(state),
+_ => internal_err!("Expected build side in ready state"),
+}
+}
+
+/// Tries to extract BuildSideReadyState from BuildSide enum.
+/// Returns an error if state is not Ready.
+fn try_as_ready_mut( self) -> Result< BuildSideReadyState> {
+match self {
+BuildSide::Ready(state) => Ok

(arrow-datafusion) branch main updated: Remove order_bys from AggregateExec state (#8537)

2023-12-14 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new d67c0bbecd Remove order_bys from AggregateExec state (#8537)
d67c0bbecd is described below

commit d67c0bbecd8f32049de2c931c077a66ed640413a
Author: Mustafa Akur <106137913+mustafasr...@users.noreply.github.com>
AuthorDate: Thu Dec 14 23:15:15 2023 +0300

Remove order_bys from AggregateExec state (#8537)

* Initial commit

* Remove order by from aggregate exec state
---
 .../src/physical_optimizer/aggregate_statistics.rs | 12 --
 .../combine_partial_final_agg.rs   |  4 
 .../src/physical_optimizer/enforce_distribution.rs |  4 
 .../limited_distinct_aggregation.rs| 25 ---
 .../core/src/physical_optimizer/test_utils.rs  |  1 -
 .../src/physical_optimizer/topk_aggregation.rs |  1 -
 datafusion/core/src/physical_planner.rs|  5 +---
 datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs |  2 --
 datafusion/physical-plan/src/aggregates/mod.rs | 28 --
 datafusion/physical-plan/src/limit.rs  |  1 -
 datafusion/proto/proto/datafusion.proto|  1 -
 datafusion/proto/src/generated/pbjson.rs   | 18 --
 datafusion/proto/src/generated/prost.rs|  2 --
 datafusion/proto/src/physical_plan/mod.rs  | 21 
 .../proto/tests/cases/roundtrip_physical_plan.rs   |  3 ---
 15 files changed, 9 insertions(+), 119 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs 
b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
index 795857b10e..86a8cdb7b3 100644
--- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
+++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
@@ -397,7 +397,6 @@ pub(crate) mod tests {
 PhysicalGroupBy::default(),
 vec![agg.count_expr()],
 vec![None],
-vec![None],
 source,
 Arc::clone(),
 )?;
@@ -407,7 +406,6 @@ pub(crate) mod tests {
 PhysicalGroupBy::default(),
 vec![agg.count_expr()],
 vec![None],
-vec![None],
 Arc::new(partial_agg),
 Arc::clone(),
 )?;
@@ -429,7 +427,6 @@ pub(crate) mod tests {
 PhysicalGroupBy::default(),
 vec![agg.count_expr()],
 vec![None],
-vec![None],
 source,
 Arc::clone(),
 )?;
@@ -439,7 +436,6 @@ pub(crate) mod tests {
 PhysicalGroupBy::default(),
 vec![agg.count_expr()],
 vec![None],
-vec![None],
 Arc::new(partial_agg),
 Arc::clone(),
 )?;
@@ -460,7 +456,6 @@ pub(crate) mod tests {
 PhysicalGroupBy::default(),
 vec![agg.count_expr()],
 vec![None],
-vec![None],
 source,
 Arc::clone(),
 )?;
@@ -473,7 +468,6 @@ pub(crate) mod tests {
 PhysicalGroupBy::default(),
 vec![agg.count_expr()],
 vec![None],
-vec![None],
 Arc::new(coalesce),
 Arc::clone(),
 )?;
@@ -494,7 +488,6 @@ pub(crate) mod tests {
 PhysicalGroupBy::default(),
 vec![agg.count_expr()],
 vec![None],
-vec![None],
 source,
 Arc::clone(),
 )?;
@@ -507,7 +500,6 @@ pub(crate) mod tests {
 PhysicalGroupBy::default(),
 vec![agg.count_expr()],
 vec![None],
-vec![None],
 Arc::new(coalesce),
 Arc::clone(),
 )?;
@@ -539,7 +531,6 @@ pub(crate) mod tests {
 PhysicalGroupBy::default(),
 vec![agg.count_expr()],
 vec![None],
-vec![None],
 filter,
 Arc::clone(),
 )?;
@@ -549,7 +540,6 @@ pub(crate) mod tests {
 PhysicalGroupBy::default(),
 vec![agg.count_expr()],
 vec![None],
-vec![None],
 Arc::new(partial_agg),
 Arc::clone(),
 )?;
@@ -586,7 +576,6 @@ pub(crate) mod tests {
 PhysicalGroupBy::default(),
 vec![agg.count_expr()],
 vec![None],
-vec![None],
 filter,
 Arc::clone(),
 )?;
@@ -596,7 +585,6 @@ pub(crate) mod tests {
 PhysicalGroupBy::default(),
 vec![agg.count_expr()],
 vec![None],
-vec![None],
 Arc::new(partial_agg),
 Arc::clone(),
 )?;
diff --git 
a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs 
b/datafusion/core/src/physical_opt

(arrow-datafusion) branch main updated: Add `today` alias for `current_date` (#8423)

2023-12-12 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 11542740a9 Add `today` alias for `current_date` (#8423)
11542740a9 is described below

commit 11542740a982d3aba36f43d93967a58bd9ab8d9b
Author: jokercurry <982458...@qq.com>
AuthorDate: Tue Dec 12 21:23:08 2023 +0800

Add `today` alias for `current_date` (#8423)

* fix conflict

* add md

* add test

* add test

* addr comments

* Update datafusion/sqllogictest/test_files/timestamps.slt

Co-authored-by: Alex Huang 

-

Co-authored-by: zhongjingxiong 
Co-authored-by: Alex Huang 
---
 datafusion/expr/src/built_in_function.rs  |  2 +-
 datafusion/sqllogictest/test_files/timestamps.slt | 24 +++
 docs/source/user-guide/sql/scalar_functions.md|  9 +
 3 files changed, 34 insertions(+), 1 deletion(-)

diff --git a/datafusion/expr/src/built_in_function.rs 
b/datafusion/expr/src/built_in_function.rs
index 5a903a73ad..fd899289ac 100644
--- a/datafusion/expr/src/built_in_function.rs
+++ b/datafusion/expr/src/built_in_function.rs
@@ -1532,7 +1532,7 @@ impl BuiltinScalarFunction {
 
 // time/date functions
 BuiltinScalarFunction::Now => &["now"],
-BuiltinScalarFunction::CurrentDate => &["current_date"],
+BuiltinScalarFunction::CurrentDate => &["current_date", "today"],
 BuiltinScalarFunction::CurrentTime => &["current_time"],
 BuiltinScalarFunction::DateBin => &["date_bin"],
 BuiltinScalarFunction::DateTrunc => &["date_trunc", "datetrunc"],
diff --git a/datafusion/sqllogictest/test_files/timestamps.slt 
b/datafusion/sqllogictest/test_files/timestamps.slt
index 71b6ddf33f..f956d59b1d 100644
--- a/datafusion/sqllogictest/test_files/timestamps.slt
+++ b/datafusion/sqllogictest/test_files/timestamps.slt
@@ -46,6 +46,30 @@ statement ok
 create table ts_data_secs as select arrow_cast(ts / 10, 
'Timestamp(Second, None)') as ts, value from ts_data;
 
 
+##
+## Current date Tests
+##
+
+query B
+select cast(now() as date) = current_date();
+
+true
+
+query B
+select now() = current_date();
+
+false
+
+query B
+select current_date() = today();
+
+true
+
+query B
+select cast(now() as date) = today();
+
+true
+
 
 ##
 ## Timestamp Handling Tests
diff --git a/docs/source/user-guide/sql/scalar_functions.md 
b/docs/source/user-guide/sql/scalar_functions.md
index 9a9bec9df7..ad4c6ed083 100644
--- a/docs/source/user-guide/sql/scalar_functions.md
+++ b/docs/source/user-guide/sql/scalar_functions.md
@@ -1280,6 +1280,7 @@ regexp_replace(str, regexp, replacement, flags)
 - [datepart](#datepart)
 - [extract](#extract)
 - [to_timestamp](#to_timestamp)
+- [today](#today)
 - [to_timestamp_millis](#to_timestamp_millis)
 - [to_timestamp_micros](#to_timestamp_micros)
 - [to_timestamp_seconds](#to_timestamp_seconds)
@@ -1308,6 +1309,14 @@ no matter when in the query plan the function executes.
 current_date()
 ```
 
+ Aliases
+
+- today
+
+### `today`
+
+_Alias of [current_date](#current_date)._
+
 ### `current_time`
 
 Returns the current UTC time.



(arrow-datafusion) branch main updated: update cast (#8458)

2023-12-08 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 3f6ff22d40 update cast (#8458)
3f6ff22d40 is described below

commit 3f6ff22d40e0d3373e1538929ec54ee1ec330fc9
Author: Alex Huang 
AuthorDate: Fri Dec 8 12:46:29 2023 +0100

update cast (#8458)
---
 datafusion/physical-expr/src/expressions/cast.rs | 6 ++
 1 file changed, 2 insertions(+), 4 deletions(-)

diff --git a/datafusion/physical-expr/src/expressions/cast.rs 
b/datafusion/physical-expr/src/expressions/cast.rs
index b3ca95292a..0c4ed3c125 100644
--- a/datafusion/physical-expr/src/expressions/cast.rs
+++ b/datafusion/physical-expr/src/expressions/cast.rs
@@ -140,8 +140,7 @@ impl PhysicalExpr for CastExpr {
 let mut s = state;
 self.expr.hash( s);
 self.cast_type.hash( s);
-// Add `self.cast_options` when hash is available
-// https://github.com/apache/arrow-rs/pull/4395
+self.cast_options.hash( s);
 }
 
 /// A [`CastExpr`] preserves the ordering of its child.
@@ -157,8 +156,7 @@ impl PartialEq for CastExpr {
 .map(|x| {
 self.expr.eq()
 && self.cast_type == x.cast_type
-// TODO: Use 
https://github.com/apache/arrow-rs/issues/2966 when available
-&& self.cast_options.safe == x.cast_options.safe
+&& self.cast_options == x.cast_options
 })
 .unwrap_or(false)
 }



(arrow-datafusion) branch main updated: Not fail when window input is empty record batch (#8466)

2023-12-08 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new ecb7c7da95 Not fail when window input is empty record batch (#8466)
ecb7c7da95 is described below

commit ecb7c7da957d4cc9a772b7c9b9c36e57292ee699
Author: Mustafa Akur <106137913+mustafasr...@users.noreply.github.com>
AuthorDate: Fri Dec 8 14:44:53 2023 +0300

Not fail when window input is empty record batch (#8466)
---
 datafusion/common/src/utils.rs | 10 +++---
 .../physical-plan/src/windows/bounded_window_agg_exec.rs   |  9 ++---
 datafusion/sqllogictest/test_files/window.slt  |  6 --
 3 files changed, 17 insertions(+), 8 deletions(-)

diff --git a/datafusion/common/src/utils.rs b/datafusion/common/src/utils.rs
index 9094ecd063..fecab8835e 100644
--- a/datafusion/common/src/utils.rs
+++ b/datafusion/common/src/utils.rs
@@ -25,7 +25,7 @@ use arrow::compute;
 use arrow::compute::{partition, SortColumn, SortOptions};
 use arrow::datatypes::{Field, SchemaRef, UInt32Type};
 use arrow::record_batch::RecordBatch;
-use arrow_array::{Array, LargeListArray, ListArray};
+use arrow_array::{Array, LargeListArray, ListArray, RecordBatchOptions};
 use arrow_schema::DataType;
 use sqlparser::ast::Ident;
 use sqlparser::dialect::GenericDialect;
@@ -90,8 +90,12 @@ pub fn get_record_batch_at_indices(
 indices: ,
 ) -> Result {
 let new_columns = get_arrayref_at_indices(record_batch.columns(), 
indices)?;
-RecordBatch::try_new(record_batch.schema(), new_columns)
-.map_err(DataFusionError::ArrowError)
+RecordBatch::try_new_with_options(
+record_batch.schema(),
+new_columns,
+::new().with_row_count(Some(indices.len())),
+)
+.map_err(DataFusionError::ArrowError)
 }
 
 /// This function compares two tuples depending on the given sort options.
diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs 
b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
index f988b28cce..431a43bc60 100644
--- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
@@ -40,7 +40,7 @@ use crate::{
 };
 
 use arrow::{
-array::{Array, ArrayRef, UInt32Builder},
+array::{Array, ArrayRef, RecordBatchOptions, UInt32Builder},
 compute::{concat, concat_batches, sort_to_indices},
 datatypes::{Schema, SchemaBuilder, SchemaRef},
 record_batch::RecordBatch,
@@ -1026,8 +1026,11 @@ impl BoundedWindowAggStream {
 .iter()
 .map(|elem| elem.slice(n_out, n_to_keep))
 .collect::>();
-self.input_buffer =
-RecordBatch::try_new(self.input_buffer.schema(), batch_to_keep)?;
+self.input_buffer = RecordBatch::try_new_with_options(
+self.input_buffer.schema(),
+batch_to_keep,
+::new().with_row_count(Some(n_to_keep)),
+)?;
 Ok(())
 }
 
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index b660a9a0c2..7846bb001a 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -3771,10 +3771,12 @@ select a,
 1 1
 2 1
 
-# TODO: this works in Postgres which returns [1, 1].
-query error DataFusion error: Arrow error: Invalid argument error: must either 
specify a row count or at least one column
+query I
 select rank() over (RANGE between UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) 
rnk
from (select 1 a union select 2 a) q;
+
+1
+1
 
 query II
 select a,



(arrow-datafusion) branch main updated: Preserve `dict_id` on `Field` during serde roundtrip (#8457)

2023-12-08 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new a8d74a7b14 Preserve `dict_id` on `Field` during serde roundtrip (#8457)
a8d74a7b14 is described below

commit a8d74a7b141a63430f10c313c55aad059d81ecb5
Author: Brent Gardner 
AuthorDate: Fri Dec 8 04:10:39 2023 -0700

Preserve `dict_id` on `Field` during serde roundtrip (#8457)

* Failing test

* Passing test
---
 datafusion/proto/proto/datafusion.proto|  2 ++
 datafusion/proto/src/generated/pbjson.rs   | 39 ++
 datafusion/proto/src/generated/prost.rs|  4 +++
 datafusion/proto/src/logical_plan/from_proto.rs| 16 +++--
 datafusion/proto/src/logical_plan/to_proto.rs  |  2 ++
 .../proto/tests/cases/roundtrip_logical_plan.rs| 39 ++
 6 files changed, 100 insertions(+), 2 deletions(-)

diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index 863e3c315c..de7afd5c7b 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -840,6 +840,8 @@ message Field {
   // for complex data types like structs, unions
   repeated Field children = 4;
   map metadata = 5;
+  int64 dict_id = 6;
+  bool dict_ordered = 7;
 }
 
 message FixedSizeBinary{
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 74798ee8e9..3001a9c095 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -6910,6 +6910,12 @@ impl serde::Serialize for Field {
 if !self.metadata.is_empty() {
 len += 1;
 }
+if self.dict_id != 0 {
+len += 1;
+}
+if self.dict_ordered {
+len += 1;
+}
 let mut struct_ser = serializer.serialize_struct("datafusion.Field", 
len)?;
 if !self.name.is_empty() {
 struct_ser.serialize_field("name", )?;
@@ -6926,6 +6932,13 @@ impl serde::Serialize for Field {
 if !self.metadata.is_empty() {
 struct_ser.serialize_field("metadata", )?;
 }
+if self.dict_id != 0 {
+#[allow(clippy::needless_borrow)]
+struct_ser.serialize_field("dictId", 
ToString::to_string(_id).as_str())?;
+}
+if self.dict_ordered {
+struct_ser.serialize_field("dictOrdered", _ordered)?;
+}
 struct_ser.end()
 }
 }
@@ -6942,6 +6955,10 @@ impl<'de> serde::Deserialize<'de> for Field {
 "nullable",
 "children",
 "metadata",
+"dict_id",
+"dictId",
+"dict_ordered",
+"dictOrdered",
 ];
 
 #[allow(clippy::enum_variant_names)]
@@ -6951,6 +6968,8 @@ impl<'de> serde::Deserialize<'de> for Field {
 Nullable,
 Children,
 Metadata,
+DictId,
+DictOrdered,
 }
 impl<'de> serde::Deserialize<'de> for GeneratedField {
 fn deserialize(deserializer: D) -> 
std::result::Result
@@ -6977,6 +6996,8 @@ impl<'de> serde::Deserialize<'de> for Field {
 "nullable" => Ok(GeneratedField::Nullable),
 "children" => Ok(GeneratedField::Children),
 "metadata" => Ok(GeneratedField::Metadata),
+"dictId" | "dict_id" => Ok(GeneratedField::DictId),
+"dictOrdered" | "dict_ordered" => 
Ok(GeneratedField::DictOrdered),
 _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
 }
 }
@@ -7001,6 +7022,8 @@ impl<'de> serde::Deserialize<'de> for Field {
 let mut nullable__ = None;
 let mut children__ = None;
 let mut metadata__ = None;
+let mut dict_id__ = None;
+let mut dict_ordered__ = None;
 while let Some(k) = map_.next_key()? {
 match k {
 GeneratedField::Name => {
@@ -7035,6 +7058,20 @@ impl<'de> serde::Deserialize<'de> for Field {
 map_.next_value::>()?
 );
 }
+GeneratedField::DictId => {
+if dict_id__.is_some() {
+return 
Err(serde::de::Error::duplicate_field("d

(arrow-datafusion) branch main updated: Fix bug in optimizing a nested count (#8459)

2023-12-07 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new d5dd535199 Fix bug in optimizing a nested count (#8459)
d5dd535199 is described below

commit d5dd5351995bb09797827d879af070631b6f58c7
Author: Daniël Heres 
AuthorDate: Thu Dec 7 22:47:34 2023 +0100

Fix bug in optimizing a nested count (#8459)

* Fix nested count optimization

* fmt

* extend comment

* Clippy

* Update datafusion/optimizer/src/optimize_projections.rs

Co-authored-by: Liang-Chi Hsieh 

* Add sqllogictests

* Fmt

-

Co-authored-by: Liang-Chi Hsieh 
---
 datafusion/optimizer/src/optimize_projections.rs | 40 +---
 datafusion/sqllogictest/test_files/aggregate.slt | 13 
 2 files changed, 49 insertions(+), 4 deletions(-)

diff --git a/datafusion/optimizer/src/optimize_projections.rs 
b/datafusion/optimizer/src/optimize_projections.rs
index 440e12cc26..8bee295154 100644
--- a/datafusion/optimizer/src/optimize_projections.rs
+++ b/datafusion/optimizer/src/optimize_projections.rs
@@ -192,7 +192,7 @@ fn optimize_projections(
 let new_group_bys = aggregate.group_expr.clone();
 
 // Only use absolutely necessary aggregate expressions required by 
parent.
-let new_aggr_expr = get_at_indices(_expr, 
_reqs);
+let mut new_aggr_expr = get_at_indices(_expr, 
_reqs);
 let all_exprs_iter = 
new_group_bys.iter().chain(new_aggr_expr.iter());
 let necessary_indices =
 indices_referred_by_exprs(, all_exprs_iter)?;
@@ -213,6 +213,16 @@ fn optimize_projections(
 let (aggregate_input, _is_added) =
 add_projection_on_top_if_helpful(aggregate_input, 
necessary_exprs, true)?;
 
+// Aggregate always needs at least one aggregate expression.
+// With a nested count we don't require any column as input, but 
still need to create a correct aggregate
+// The aggregate may be optimized out later (select count(*) from 
(select count(*) from [...]) always returns 1
+if new_aggr_expr.is_empty()
+&& new_group_bys.is_empty()
+&& !aggregate.aggr_expr.is_empty()
+{
+new_aggr_expr = vec![aggregate.aggr_expr[0].clone()];
+}
+
 // Create new aggregate plan with updated input, and absolutely 
necessary fields.
 return Aggregate::try_new(
 Arc::new(aggregate_input),
@@ -857,10 +867,11 @@ fn rewrite_projection_given_requirements(
 #[cfg(test)]
 mod tests {
 use crate::optimize_projections::OptimizeProjections;
-use datafusion_common::Result;
+use arrow::datatypes::{DataType, Field, Schema};
+use datafusion_common::{Result, TableReference};
 use datafusion_expr::{
-binary_expr, col, lit, logical_plan::builder::LogicalPlanBuilder, 
LogicalPlan,
-Operator,
+binary_expr, col, count, lit, 
logical_plan::builder::LogicalPlanBuilder,
+table_scan, Expr, LogicalPlan, Operator,
 };
 use std::sync::Arc;
 
@@ -909,4 +920,25 @@ mod tests {
 \n  TableScan: test projection=[a]";
 assert_optimized_plan_equal(, expected)
 }
+#[test]
+fn test_nested_count() -> Result<()> {
+let schema = Schema::new(vec![Field::new("foo", DataType::Int32, 
false)]);
+
+let groups: Vec = vec![];
+
+let plan = table_scan(TableReference::none(), , None)
+.unwrap()
+.aggregate(groups.clone(), vec![count(lit(1))])
+.unwrap()
+.aggregate(groups, vec![count(lit(1))])
+.unwrap()
+.build()
+.unwrap();
+
+let expected = "Aggregate: groupBy=[[]], aggr=[[COUNT(Int32(1))]]\
+\n  Projection: \
+\nAggregate: groupBy=[[]], aggr=[[COUNT(Int32(1))]]\
+\n  TableScan: ?table? projection=[]";
+assert_optimized_plan_equal(, expected)
+}
 }
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt 
b/datafusion/sqllogictest/test_files/aggregate.slt
index e4718035a5..7cfc9c707d 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -3199,3 +3199,16 @@ FROM my_data
 GROUP BY dummy
 
 text1, text1, text1
+
+
+# Queries with nested count(*)
+
+query I
+select count(*) from (select count(*) from (select 1));
+
+1
+
+query I
+select count(*) from (select count(*) a, count(*) b from (select 1));
+
+1
\ No newline at end of file



(arrow-datafusion) branch main updated: Minor: Use `ScalarValue::from` impl for strings (#8429)

2023-12-06 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new fd92bcb225 Minor: Use `ScalarValue::from` impl for strings (#8429)
fd92bcb225 is described below

commit fd92bcb225ded5b9c4c8b2661a8b3a33868dda0f
Author: Andrew Lamb 
AuthorDate: Wed Dec 6 03:12:39 2023 -0500

Minor: Use `ScalarValue::from` impl for strings (#8429)

* Minor: Use ScalarValue::from impl for strings

* fix typo
---
 datafusion/common/src/pyarrow.rs   |  2 +-
 datafusion/common/src/scalar.rs| 25 ++---
 datafusion/core/src/datasource/listing/helpers.rs  | 20 +++
 .../core/src/datasource/physical_plan/avro.rs  |  3 +-
 .../core/src/datasource/physical_plan/csv.rs   |  3 +-
 .../datasource/physical_plan/file_scan_config.rs   | 42 +++---
 .../src/datasource/physical_plan/parquet/mod.rs|  4 +--
 datafusion/core/src/test/variable.rs   |  4 +--
 datafusion/execution/src/config.rs |  2 +-
 datafusion/expr/src/expr.rs|  2 +-
 datafusion/expr/src/literal.rs |  6 ++--
 .../src/simplify_expressions/expr_simplifier.rs|  9 ++---
 .../src/simplify_expressions/guarantees.rs |  6 ++--
 .../optimizer/src/simplify_expressions/regex.rs|  2 +-
 datafusion/physical-expr/benches/in_list.rs|  2 +-
 datafusion/physical-expr/src/aggregate/min_max.rs  | 14 ++--
 .../physical-expr/src/aggregate/string_agg.rs  |  2 +-
 .../physical-expr/src/datetime_expressions.rs  |  2 +-
 .../src/expressions/get_indexed_field.rs   |  2 +-
 datafusion/physical-expr/src/expressions/nullif.rs |  2 +-
 datafusion/physical-expr/src/functions.rs  |  4 +--
 datafusion/physical-plan/src/joins/cross_join.rs   | 32 +
 datafusion/physical-plan/src/projection.rs | 16 +++--
 datafusion/physical-plan/src/union.rs  | 24 -
 .../proto/tests/cases/roundtrip_logical_plan.rs|  2 +-
 .../proto/tests/cases/roundtrip_physical_plan.rs   |  2 +-
 datafusion/sql/src/expr/mod.rs |  4 +--
 datafusion/sql/tests/sql_integration.rs| 12 +++
 28 files changed, 83 insertions(+), 167 deletions(-)

diff --git a/datafusion/common/src/pyarrow.rs b/datafusion/common/src/pyarrow.rs
index aa01539193..f435647753 100644
--- a/datafusion/common/src/pyarrow.rs
+++ b/datafusion/common/src/pyarrow.rs
@@ -119,7 +119,7 @@ mod tests {
 ScalarValue::Boolean(Some(true)),
 ScalarValue::Int32(Some(23)),
 ScalarValue::Float64(Some(12.34)),
-ScalarValue::Utf8(Some("Hello!".to_string())),
+ScalarValue::from("Hello!"),
 ScalarValue::Date32(Some(1234)),
 ];
 
diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index 177fe00a6a..10f052b909 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -774,7 +774,7 @@ impl ScalarValue {
 
 /// Returns a [`ScalarValue::Utf8`] representing `val`
 pub fn new_utf8(val: impl Into) -> Self {
-ScalarValue::Utf8(Some(val.into()))
+ScalarValue::from(val.into())
 }
 
 /// Returns a [`ScalarValue::IntervalYearMonth`] representing
@@ -2699,7 +2699,7 @@ impl ScalarValue {
 
 /// Try to parse `value` into a ScalarValue of type `target_type`
 pub fn try_from_string(value: String, target_type: ) -> 
Result {
-let value = ScalarValue::Utf8(Some(value));
+let value = ScalarValue::from(value);
 let cast_options = CastOptions {
 safe: false,
 format_options: Default::default(),
@@ -3581,9 +3581,9 @@ mod tests {
 #[test]
 fn test_list_to_array_string() {
 let scalars = vec![
-ScalarValue::Utf8(Some(String::from("rust"))),
-ScalarValue::Utf8(Some(String::from("arrow"))),
-ScalarValue::Utf8(Some(String::from("data-fusion"))),
+ScalarValue::from("rust"),
+ScalarValue::from("arrow"),
+ScalarValue::from("data-fusion"),
 ];
 
 let array = ScalarValue::new_list(scalars.as_slice(), ::Utf8);
@@ -4722,7 +4722,7 @@ mod tests {
 Some(vec![
 ScalarValue::Int32(Some(23)),
 ScalarValue::Boolean(Some(false)),
-ScalarValue::Utf8(Some("Hello".to_string())),
+ScalarValue::from("Hello"),
 ScalarValue::from(vec![
 ("e", ScalarValue::from(2i16)),
 ("f", ScalarValue::from(3i64)),
@@ -4915,17 +4915,17 @@ mod tests {
 
 //

(arrow-datafusion) branch main updated: Update custom-table-providers.md (#8409)

2023-12-04 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new e5a95b17a9 Update custom-table-providers.md (#8409)
e5a95b17a9 is described below

commit e5a95b17a9013b3d1c9bbd719e2ba7c17fabeaf3
Author: Nick Poorman 
AuthorDate: Mon Dec 4 01:40:50 2023 -0700

Update custom-table-providers.md (#8409)
---
 docs/source/library-user-guide/custom-table-providers.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/source/library-user-guide/custom-table-providers.md 
b/docs/source/library-user-guide/custom-table-providers.md
index ca0e9de779..9da207da68 100644
--- a/docs/source/library-user-guide/custom-table-providers.md
+++ b/docs/source/library-user-guide/custom-table-providers.md
@@ -25,7 +25,7 @@ This section will also touch on how to have DataFusion use 
the new `TableProvide
 
 ## Table Provider and Scan
 
-The `scan` method on the `TableProvider` is likely its most important. It 
returns an `ExecutionPlan` that DataFusion will use to read the actual data 
during execution o the query.
+The `scan` method on the `TableProvider` is likely its most important. It 
returns an `ExecutionPlan` that DataFusion will use to read the actual data 
during execution of the query.
 
 ### Scan
 



(arrow-datafusion) branch main updated (3b298374f9 -> 340ecfdfe0)

2023-12-02 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


from 3b298374f9 Docs: Improve the documentation on `ScalarValue` (#8378)
 add 340ecfdfe0 Avoid concat for `array_replace` (#8337)

No new revisions were added by this update.

Summary of changes:
 datafusion/core/Cargo.toml|   4 +
 datafusion/core/benches/array_expression.rs   |  73 +
 datafusion/physical-expr/src/array_expressions.rs | 125 ++
 3 files changed, 135 insertions(+), 67 deletions(-)
 create mode 100644 datafusion/core/benches/array_expression.rs



(arrow-datafusion) branch main updated: Docs: Improve the documentation on `ScalarValue` (#8378)

2023-12-02 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 3b298374f9 Docs: Improve the documentation on `ScalarValue` (#8378)
3b298374f9 is described below

commit 3b298374f9706fd15e21744b3ffa00ae9e100377
Author: Andrew Lamb 
AuthorDate: Sat Dec 2 04:17:18 2023 -0500

Docs: Improve the documentation on `ScalarValue` (#8378)

* Minor: Improve the documentation on `ScalarValue`

* Update datafusion/common/src/scalar.rs

Co-authored-by: Liang-Chi Hsieh 

* Update datafusion/common/src/scalar.rs

Co-authored-by: Liang-Chi Hsieh 

-

Co-authored-by: Liang-Chi Hsieh 
---
 datafusion/common/src/scalar.rs | 47 +++--
 1 file changed, 45 insertions(+), 2 deletions(-)

diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index 3431d71468..ef0edbd9e0 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -50,9 +50,52 @@ use arrow::{
 use arrow_array::cast::as_list_array;
 use arrow_array::{ArrowNativeTypeOp, Scalar};
 
-/// Represents a dynamically typed, nullable single value.
-/// This is the single-valued counter-part to arrow's [`Array`].
+/// A dynamically typed, nullable single value, (the single-valued counter-part
+/// to arrow's [`Array`])
 ///
+/// # Performance
+///
+/// In general, please use arrow [`Array`]s rather than [`ScalarValue`] 
whenever
+/// possible, as it is far more efficient for multiple values.
+///
+/// # Example
+/// ```
+/// # use datafusion_common::ScalarValue;
+/// // Create single scalar value for an Int32 value
+/// let s1 = ScalarValue::Int32(Some(10));
+///
+/// // You can also create values using the From impl:
+/// let s2 = ScalarValue::from(10i32);
+/// assert_eq!(s1, s2);
+/// ```
+///
+/// # Null Handling
+///
+/// `ScalarValue` represents null values in the same way as Arrow. Nulls are
+/// "typed" in the sense that a null value in an [`Int32Array`] is different
+/// than a null value in a [`Float64Array`], and is different than the values 
in
+/// a [`NullArray`].
+///
+/// ```
+/// # fn main() -> datafusion_common::Result<()> {
+/// # use std::collections::hash_set::Difference;
+/// # use datafusion_common::ScalarValue;
+/// # use arrow::datatypes::DataType;
+/// // You can create a 'null' Int32 value directly:
+/// let s1 = ScalarValue::Int32(None);
+///
+/// // You can also create a null value for a given datatype:
+/// let s2 = ScalarValue::try_from(::Int32)?;
+/// assert_eq!(s1, s2);
+///
+/// // Note that this is DIFFERENT than a `ScalarValue::Null`
+/// let s3 = ScalarValue::Null;
+/// assert_ne!(s1, s3);
+/// # Ok(())
+/// # }
+/// ```
+///
+/// # Further Reading
 /// See [datatypes](https://arrow.apache.org/docs/python/api/datatypes.html) 
for
 /// details on datatypes and the 
[format](https://github.com/apache/arrow/blob/master/format/Schema.fbs#L354-L375)
 /// for the definitive reference.



(arrow-ballista) branch main updated: Use lz4 compression for shuffle files & flight stream, refactoring / improvements (#920)

2023-11-30 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/main by this push:
 new 13456466 Use lz4 compression for shuffle files & flight stream, 
refactoring / improvements (#920)
13456466 is described below

commit 134564666c172d923c278390bde97a792a6fee52
Author: Daniël Heres 
AuthorDate: Thu Nov 30 18:02:17 2023 +

Use lz4 compression for shuffle files & flight stream, refactoring / 
improvements (#920)

* Use lz4 compression for shuffle files and streams

* Add feature

* Backport improvements

* More compression
---
 Cargo.toml |   2 +-
 .../core/src/execution_plans/shuffle_writer.rs |   9 +-
 ballista/core/src/utils.rs |   9 +-
 ballista/executor/src/flight_service.rs| 117 -
 4 files changed, 61 insertions(+), 76 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index e4b5f324..fcdebab6 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -29,7 +29,7 @@ members = [
 resolver = "2"
 
 [workspace.dependencies]
-arrow = { version = "48.0.0" }
+arrow = { version = "48.0.0", features=["ipc_compression"] }
 arrow-flight = { version = "48.0.0", features = ["flight-sql-experimental"] }
 arrow-schema = { version = "48.0.0", default-features = false }
 configure_me = { version = "0.4.0" }
diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs 
b/ballista/core/src/execution_plans/shuffle_writer.rs
index 1896c206..2540a1d2 100644
--- a/ballista/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/core/src/execution_plans/shuffle_writer.rs
@@ -20,6 +20,8 @@
 //! partition is re-partitioned and streamed to disk in Arrow IPC format. 
Future stages of the query
 //! will use the ShuffleReaderExec to read these results.
 
+use datafusion::arrow::ipc::writer::IpcWriteOptions;
+use datafusion::arrow::ipc::CompressionType;
 use datafusion::physical_plan::expressions::PhysicalSortExpr;
 
 use std::any::Any;
@@ -242,9 +244,14 @@ impl ShuffleWriterExec {
 ));
 debug!("Writing results to {:?}", 
path);
 
-let mut writer = IPCWriter::new(
+let options = 
IpcWriteOptions::default()
+.try_with_compression(Some(
+CompressionType::LZ4_FRAME,
+))?;
+let mut writer = 
IPCWriter::new_with_options(
 ,
 stream.schema().as_ref(),
+options,
 )?;
 
 writer.write(_batch)?;
diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs
index 3252ad72..e16c1b4c 100644
--- a/ballista/core/src/utils.rs
+++ b/ballista/core/src/utils.rs
@@ -25,6 +25,8 @@ use crate::serde::scheduler::PartitionStats;
 
 use async_trait::async_trait;
 use datafusion::arrow::datatypes::Schema;
+use datafusion::arrow::ipc::writer::IpcWriteOptions;
+use datafusion::arrow::ipc::CompressionType;
 use datafusion::arrow::{ipc::writer::FileWriter, record_batch::RecordBatch};
 use datafusion::datasource::physical_plan::{CsvExec, ParquetExec};
 use datafusion::error::DataFusionError;
@@ -82,7 +84,12 @@ pub async fn write_stream_to_disk(
 let mut num_rows = 0;
 let mut num_batches = 0;
 let mut num_bytes = 0;
-let mut writer = FileWriter::try_new(file, stream.schema().as_ref())?;
+
+let options = IpcWriteOptions::default()
+.try_with_compression(Some(CompressionType::LZ4_FRAME))?;
+
+let mut writer =
+FileWriter::try_new_with_options(file, stream.schema().as_ref(), 
options)?;
 
 while let Some(result) = stream.next().await {
 let batch = result?;
diff --git a/ballista/executor/src/flight_service.rs 
b/ballista/executor/src/flight_service.rs
index 4a62bc5f..ed8ffb35 100644
--- a/ballista/executor/src/flight_service.rs
+++ b/ballista/executor/src/flight_service.rs
@@ -21,12 +21,14 @@ use std::convert::TryFrom;
 use std::fs::File;
 use std::pin::Pin;
 
-use arrow_flight::SchemaAsIpc;
+use arrow::ipc::CompressionType;
+use arrow_flight::encode::FlightDataEncoderBuilder;
+use arrow_flight::error::FlightError;
 use ballista_core::error::BallistaError;
 use ballista_core::serde::decode_protobuf;
 use ballista_core::serde::scheduler::Action as BallistaAction;
 
-use arrow::ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
+use arrow::ipc::writer::I

(arrow-datafusion) branch main updated: clean up the code based on Clippy (#8359)

2023-11-29 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new e93f8e13a7 clean up the code based on Clippy (#8359)
e93f8e13a7 is described below

commit e93f8e13a7f34f1d17f299ffcc1cbb103246d602
Author: Alex Huang 
AuthorDate: Wed Nov 29 17:40:49 2023 +0100

clean up the code based on Clippy (#8359)
---
 datafusion/optimizer/src/optimize_projections.rs | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/datafusion/optimizer/src/optimize_projections.rs 
b/datafusion/optimizer/src/optimize_projections.rs
index 1e98ee76d2..b6d026279a 100644
--- a/datafusion/optimizer/src/optimize_projections.rs
+++ b/datafusion/optimizer/src/optimize_projections.rs
@@ -381,7 +381,7 @@ fn merge_consecutive_projections(proj: ) -> 
Result
 .flat_map(|expr| expr.to_columns())
 .fold(HashMap::new(), |mut map, cols| {
 cols.into_iter()
-.for_each(|col| *map.entry(col.clone()).or_default() += 1);
+.for_each(|col| *map.entry(col).or_default() += 1);
 map
 });
 
@@ -827,7 +827,7 @@ fn rewrite_projection_given_requirements(
 if _schema(, _used)? == input.schema() {
 Ok(Some(input))
 } else {
-let new_proj = Projection::try_new(exprs_used, 
Arc::new(input.clone()))?;
+let new_proj = Projection::try_new(exprs_used, Arc::new(input))?;
 let new_proj = LogicalPlan::Projection(new_proj);
 Ok(Some(new_proj))
 }



(arrow-ballista) branch use_lz4_compression updated (cc2cb396 -> 640208f6)

2023-11-29 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch use_lz4_compression
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


from cc2cb396 Backport improvements
 add 640208f6 More compression

No new revisions were added by this update.

Summary of changes:
 ballista/core/src/execution_plans/shuffle_writer.rs | 9 -
 1 file changed, 8 insertions(+), 1 deletion(-)



(arrow-ballista) branch main updated: Dynamically optimize aggregate based on shuffle stats (#919)

2023-11-28 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/main by this push:
 new e474e343 Dynamically optimize aggregate based on shuffle stats (#919)
e474e343 is described below

commit e474e343c683ffeedb6330ca284936cd25b1904b
Author: Daniël Heres 
AuthorDate: Tue Nov 28 14:34:58 2023 +

Dynamically optimize aggregate based on shuffle stats (#919)
---
 ballista/scheduler/src/state/execution_graph/execution_stage.rs | 9 +++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/ballista/scheduler/src/state/execution_graph/execution_stage.rs 
b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
index fcac54d5..f082fe43 100644
--- a/ballista/scheduler/src/state/execution_graph/execution_stage.rs
+++ b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
@@ -22,6 +22,7 @@ use std::iter::FromIterator;
 use std::sync::Arc;
 use std::time::{SystemTime, UNIX_EPOCH};
 
+use datafusion::physical_optimizer::aggregate_statistics::AggregateStatistics;
 use datafusion::physical_optimizer::join_selection::JoinSelection;
 use datafusion::physical_optimizer::PhysicalOptimizerRule;
 use datafusion::physical_plan::display::DisplayableExecutionPlan;
@@ -348,9 +349,13 @@ impl UnresolvedStage {
 _locations,
 )?;
 
-// Optimize join order based on new resolved statistics
+// Optimize join order and statistics based on new resolved statistics
 let optimize_join = JoinSelection::new();
-let plan = optimize_join.optimize(plan, 
SessionConfig::default().options())?;
+let config = SessionConfig::default();
+let plan = optimize_join.optimize(plan, config.options())?;
+let optimize_aggregate = AggregateStatistics::new();
+let plan =
+optimize_aggregate.optimize(plan, 
SessionConfig::default().options())?;
 
 Ok(ResolvedStage::new(
 self.stage_id,



(arrow-ballista) branch use_lz4_compression updated (c3886341 -> cc2cb396)

2023-11-28 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch use_lz4_compression
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


from c3886341 Add feature
 add cc2cb396 Backport improvements

No new revisions were added by this update.

Summary of changes:
 ballista/executor/src/flight_service.rs | 118 
 1 file changed, 43 insertions(+), 75 deletions(-)



(arrow-ballista) branch use_lz4_compression updated: Add feature

2023-11-27 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch use_lz4_compression
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/use_lz4_compression by this 
push:
 new c3886341 Add feature
c3886341 is described below

commit c388634146491b16eee6fee6415b74655befcd8c
Author: Daniël Heres 
AuthorDate: Mon Nov 27 18:47:36 2023 +0100

Add feature
---
 Cargo.toml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/Cargo.toml b/Cargo.toml
index e4b5f324..fcdebab6 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -29,7 +29,7 @@ members = [
 resolver = "2"
 
 [workspace.dependencies]
-arrow = { version = "48.0.0" }
+arrow = { version = "48.0.0", features=["ipc_compression"] }
 arrow-flight = { version = "48.0.0", features = ["flight-sql-experimental"] }
 arrow-schema = { version = "48.0.0", default-features = false }
 configure_me = { version = "0.4.0" }



(arrow-ballista) branch use_lz4_compression created (now 92e9a3c9)

2023-11-27 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch use_lz4_compression
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


  at 92e9a3c9 Use lz4 compression for shuffle files and streams

This branch includes the following new commits:

 new 92e9a3c9 Use lz4 compression for shuffle files and streams

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.




(arrow-ballista) 01/01: Use lz4 compression for shuffle files and streams

2023-11-27 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch use_lz4_compression
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git

commit 92e9a3c93cb0a4364d81492e5edcc867029a5c79
Author: Daniël Heres 
AuthorDate: Mon Nov 27 18:45:36 2023 +0100

Use lz4 compression for shuffle files and streams
---
 ballista/core/src/utils.rs  | 9 -
 ballista/executor/src/flight_service.rs | 5 -
 2 files changed, 12 insertions(+), 2 deletions(-)

diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs
index 3252ad72..e16c1b4c 100644
--- a/ballista/core/src/utils.rs
+++ b/ballista/core/src/utils.rs
@@ -25,6 +25,8 @@ use crate::serde::scheduler::PartitionStats;
 
 use async_trait::async_trait;
 use datafusion::arrow::datatypes::Schema;
+use datafusion::arrow::ipc::writer::IpcWriteOptions;
+use datafusion::arrow::ipc::CompressionType;
 use datafusion::arrow::{ipc::writer::FileWriter, record_batch::RecordBatch};
 use datafusion::datasource::physical_plan::{CsvExec, ParquetExec};
 use datafusion::error::DataFusionError;
@@ -82,7 +84,12 @@ pub async fn write_stream_to_disk(
 let mut num_rows = 0;
 let mut num_batches = 0;
 let mut num_bytes = 0;
-let mut writer = FileWriter::try_new(file, stream.schema().as_ref())?;
+
+let options = IpcWriteOptions::default()
+.try_with_compression(Some(CompressionType::LZ4_FRAME))?;
+
+let mut writer =
+FileWriter::try_new_with_options(file, stream.schema().as_ref(), 
options)?;
 
 while let Some(result) = stream.next().await {
 let batch = result?;
diff --git a/ballista/executor/src/flight_service.rs 
b/ballista/executor/src/flight_service.rs
index 4a62bc5f..7237ea0f 100644
--- a/ballista/executor/src/flight_service.rs
+++ b/ballista/executor/src/flight_service.rs
@@ -21,6 +21,7 @@ use std::convert::TryFrom;
 use std::fs::File;
 use std::pin::Pin;
 
+use arrow::ipc::CompressionType;
 use arrow_flight::SchemaAsIpc;
 use ballista_core::error::BallistaError;
 use ballista_core::serde::decode_protobuf;
@@ -231,7 +232,9 @@ async fn stream_flight_data(
 where
 T: Read + Seek,
 {
-let options = arrow::ipc::writer::IpcWriteOptions::default();
+let options = arrow::ipc::writer::IpcWriteOptions::default()
+.try_with_compression(Some(CompressionType::LZ4_FRAME))
+.map_err(|x| from_arrow_err())?;
 let schema_flight_data = SchemaAsIpc::new(reader.schema().as_ref(), 
).into();
 send_response(, Ok(schema_flight_data)).await?;
 



(arrow-ballista) branch main updated: Refactor cache mod, remove linked_hash_map (#918)

2023-11-27 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/main by this push:
 new b8bd8fc0 Refactor cache mod, remove linked_hash_map (#918)
b8bd8fc0 is described below

commit b8bd8fc03cafaac09a10dd75cadf3032c2b4afff
Author: Chojan Shang 
AuthorDate: Mon Nov 27 22:13:36 2023 +0800

Refactor cache mod, remove linked_hash_map (#918)

Signed-off-by: Chojan Shang 
---
 ballista/cache/Cargo.toml  |1 +
 .../backend/policy/lru/hashlink/linked_hash_map.rs | 2215 
 .../cache/src/backend/policy/lru/hashlink/mod.rs   |   20 -
 .../backend/policy/lru/{hashlink => }/lru_cache.rs |   11 +-
 ballista/cache/src/backend/policy/lru/mod.rs   |2 +-
 ballista/cache/src/loading_cache/driver.rs |2 +-
 ballista/cache/src/metrics/loading_cache.rs|2 +-
 ballista/core/src/cache_layer/policy/file.rs   |2 +-
 8 files changed, 8 insertions(+), 2247 deletions(-)

diff --git a/ballista/cache/Cargo.toml b/ballista/cache/Cargo.toml
index e8ad7552..be89453e 100644
--- a/ballista/cache/Cargo.toml
+++ b/ballista/cache/Cargo.toml
@@ -26,6 +26,7 @@ edition = "2021"
 async-trait = "0.1.64"
 futures = "0.3"
 hashbrown = "0.14"
+hashlink = "0.8.4"
 log = "0.4"
 parking_lot = "0.12"
 tokio = { version = "1.25", features = ["macros", "parking_lot", 
"rt-multi-thread", "sync", "time"] }
diff --git a/ballista/cache/src/backend/policy/lru/hashlink/linked_hash_map.rs 
b/ballista/cache/src/backend/policy/lru/hashlink/linked_hash_map.rs
deleted file mode 100644
index c7a0c36f..
--- a/ballista/cache/src/backend/policy/lru/hashlink/linked_hash_map.rs
+++ /dev/null
@@ -1,2215 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-/// It's a fork version of hashlink[https://github.com/triplehex/hashlink] 
mainly to deal with version
-/// issue of hashbrown. Later we will propose PR to the original hashlink to 
update the hashbrown version
-use std::{
-alloc::Layout,
-borrow::Borrow,
-cmp::Ordering,
-fmt,
-hash::{BuildHasher, Hash, Hasher},
-iter::FromIterator,
-marker::PhantomData,
-mem::{self, MaybeUninit},
-ops::{Index, IndexMut},
-ptr::{self, NonNull},
-};
-
-use hashbrown::{hash_map, HashMap};
-
-pub enum TryReserveError {
-CapacityOverflow,
-AllocError { layout: Layout },
-}
-
-/// A version of `HashMap` that has a user controllable order for its entries.
-///
-/// It achieves this by keeping its entries in an internal linked list and 
using a `HashMap` to
-/// point at nodes in this linked list.
-///
-/// The order of entries defaults to "insertion order", but the user can also 
modify the order of
-/// existing entries by manually moving them to the front or back.
-///
-/// There are two kinds of methods that modify the order of the internal list:
-///
-/// * Methods that have names like `to_front` and `to_back` will 
unsurprisingly move an existing
-///   entry to the front or back
-/// * Methods that have the word `insert` will insert a new entry ot the back 
of the list, and if
-///   that method might replace an entry, that method will *also move that 
existing entry to the
-///   back*.
-pub struct LinkedHashMap {
-map: HashMap>, (), NullHasher>,
-// We need to keep any custom hash builder outside of the HashMap so we 
can access it alongside
-// the entry API without mutable aliasing.
-hash_builder: S,
-// Circular linked list of nodes.  If `values` is non-null, it will point 
to a "guard node"
-// which will never have an initialized key or value, `values.prev` will 
contain the last key /
-// value in the list, `values.next` will contain the first key / value in 
the list.
-values: Option>>,
-// *Singly* linked list of free nodes.  The `prev` pointers in the free 
list should be assumed
-// invalid.
-free: Option>>,
-}
-
-impl L

(arrow-ballista) branch dynamically_optimize_aggregate created (now 7306f319)

2023-11-27 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch dynamically_optimize_aggregate
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


  at 7306f319 Dynamically optimize aggregate based on shuffle stats

This branch includes the following new commits:

 new 7306f319 Dynamically optimize aggregate based on shuffle stats

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.




(arrow-ballista) 01/01: Dynamically optimize aggregate based on shuffle stats

2023-11-27 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch dynamically_optimize_aggregate
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git

commit 7306f319f36b93d3c47715bf96063812b3ed1b7a
Author: Daniël Heres 
AuthorDate: Mon Nov 27 13:35:35 2023 +0100

Dynamically optimize aggregate based on shuffle stats
---
 ballista/scheduler/src/state/execution_graph/execution_stage.rs | 9 +++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/ballista/scheduler/src/state/execution_graph/execution_stage.rs 
b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
index fcac54d5..f082fe43 100644
--- a/ballista/scheduler/src/state/execution_graph/execution_stage.rs
+++ b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
@@ -22,6 +22,7 @@ use std::iter::FromIterator;
 use std::sync::Arc;
 use std::time::{SystemTime, UNIX_EPOCH};
 
+use datafusion::physical_optimizer::aggregate_statistics::AggregateStatistics;
 use datafusion::physical_optimizer::join_selection::JoinSelection;
 use datafusion::physical_optimizer::PhysicalOptimizerRule;
 use datafusion::physical_plan::display::DisplayableExecutionPlan;
@@ -348,9 +349,13 @@ impl UnresolvedStage {
 _locations,
 )?;
 
-// Optimize join order based on new resolved statistics
+// Optimize join order and statistics based on new resolved statistics
 let optimize_join = JoinSelection::new();
-let plan = optimize_join.optimize(plan, 
SessionConfig::default().options())?;
+let config = SessionConfig::default();
+let plan = optimize_join.optimize(plan, config.options())?;
+let optimize_aggregate = AggregateStatistics::new();
+let plan =
+optimize_aggregate.optimize(plan, 
SessionConfig::default().options())?;
 
 Ok(ResolvedStage::new(
 self.stage_id,



(arrow-ballista) branch main updated: Update to DataFusion 33 (#900)

2023-11-26 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/main by this push:
 new e4191dc7 Update to DataFusion 33 (#900)
e4191dc7 is described below

commit e4191dc7b76c00b319a7f04b0fdc452d6b2886eb
Author: Daniël Heres 
AuthorDate: Sun Nov 26 19:13:21 2023 +

Update to DataFusion 33 (#900)

* Update to DataFusion 33

* Test fixes

* Fix

* Upgrade crates

* Tests

* Tests

* Use parquet file instead
---
 Cargo.toml | 14 ++---
 ballista/client/src/columnar_batch.rs  |  8 ++-
 ballista/client/src/context.rs | 34 +--
 ballista/core/Cargo.toml   |  2 +-
 .../core/src/execution_plans/distributed_query.rs  |  4 +-
 .../core/src/execution_plans/shuffle_reader.rs | 68 +++---
 .../core/src/execution_plans/shuffle_writer.rs |  2 +-
 .../core/src/execution_plans/unresolved_shuffle.rs |  4 +-
 ballista/executor/src/collect.rs   |  2 +-
 ballista/executor/src/executor.rs  |  6 +-
 ballista/scheduler/Cargo.toml  |  6 +-
 ballista/scheduler/src/flight_sql.rs   |  5 +-
 examples/examples/standalone-sql.rs| 10 ++--
 13 files changed, 84 insertions(+), 81 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 6f34fa06..e4b5f324 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -29,16 +29,16 @@ members = [
 resolver = "2"
 
 [workspace.dependencies]
-arrow = { version = "47.0.0" }
-arrow-flight = { version = "47.0.0", features = ["flight-sql-experimental"] }
-arrow-schema = { version = "47.0.0", default-features = false }
+arrow = { version = "48.0.0" }
+arrow-flight = { version = "48.0.0", features = ["flight-sql-experimental"] }
+arrow-schema = { version = "48.0.0", default-features = false }
 configure_me = { version = "0.4.0" }
 configure_me_codegen = { version = "0.4.4" }
-datafusion = "32.0.0"
-datafusion-cli = "32.0.0"
-datafusion-proto = "32.0.0"
+datafusion = "33.0.0"
+datafusion-cli = "33.0.0"
+datafusion-proto = "33.0.0"
 object_store = "0.7.0"
-sqlparser = "0.38.0"
+sqlparser = "0.39.0"
 tonic = { version = "0.10" }
 tonic-build = { version = "0.10", default-features = false, features = [
 "transport",
diff --git a/ballista/client/src/columnar_batch.rs 
b/ballista/client/src/columnar_batch.rs
index 3431f561..5e7fe89b 100644
--- a/ballista/client/src/columnar_batch.rs
+++ b/ballista/client/src/columnar_batch.rs
@@ -147,10 +147,12 @@ impl ColumnarValue {
 }
 }
 
-pub fn to_arrow() -> ArrayRef {
+pub fn to_arrow() -> Result {
 match self {
-ColumnarValue::Columnar(array) => array.clone(),
-ColumnarValue::Scalar(value, n) => value.to_array_of_size(*n),
+ColumnarValue::Columnar(array) => Ok(array.clone()),
+ColumnarValue::Scalar(value, n) => {
+value.to_array_of_size(*n).map_err(|x| x.into())
+}
 }
 }
 
diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs
index 76c8d439..82ca1710 100644
--- a/ballista/client/src/context.rs
+++ b/ballista/client/src/context.rs
@@ -839,7 +839,7 @@ mod tests {
 let res = df.collect().await.unwrap();
 let expected = vec![
 "+---+",
-"| VARIANCE(test.id) |",
+"| VAR(test.id)  |",
 "+---+",
 "| 6.001 |",
 "+---+",
@@ -852,11 +852,11 @@ mod tests {
 .unwrap();
 let res = df.collect().await.unwrap();
 let expected = vec![
-"+---+",
-"| VARIANCE_POP(test.id) |",
-"+---+",
-"| 5.251 |",
-"+---+",
+"+---+",
+"| VAR_POP(test.id)  |",
+"+---+",
+"| 5.251 |",
+"+---+",
 ];
 assert_result_eq(expected, );
 
@@ -867,7 +867,7 @@ mod tests {
 let res = df.collect().await.unwrap();
 let expected = vec![
 "+---+",
-"| VARIANCE(test.id) |",
+"| VAR(test.id)  |",
 "

(arrow-ballista) branch df33 updated (7fbd8d76 -> d864bda7)

2023-11-26 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch df33
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


from 7fbd8d76 Tests
 add d864bda7 Use parquet file instead

No new revisions were added by this update.

Summary of changes:
 examples/examples/standalone-sql.rs | 10 +-
 1 file changed, 5 insertions(+), 5 deletions(-)



(arrow-datafusion) branch main updated: Minor: remove useless clone based on Clippy (#8300)

2023-11-22 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 98f1bc0171 Minor: remove useless clone based on Clippy (#8300)
98f1bc0171 is described below

commit 98f1bc0171874d181aee8bc654bc81ab22314a29
Author: Alex Huang 
AuthorDate: Wed Nov 22 16:07:39 2023 +0100

Minor: remove useless clone based on Clippy (#8300)
---
 datafusion/expr/src/interval_arithmetic.rs | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/datafusion/expr/src/interval_arithmetic.rs 
b/datafusion/expr/src/interval_arithmetic.rs
index c85c6fc66b..5d34fe91c3 100644
--- a/datafusion/expr/src/interval_arithmetic.rs
+++ b/datafusion/expr/src/interval_arithmetic.rs
@@ -698,7 +698,7 @@ impl Interval {
 // We want 0 to be approachable from both negative and positive sides.
 let zero_point = match  {
 DataType::Float32 | DataType::Float64 => Self::new(zero.clone(), 
zero),
-_ => Self::new(prev_value(zero.clone()), next_value(zero.clone())),
+_ => Self::new(prev_value(zero.clone()), next_value(zero)),
 };
 
 // Exit early with an unbounded interval if zero is strictly inside the



(arrow-datafusion) branch main updated: Update prost-build requirement from =0.12.2 to =0.12.3 (#8298)

2023-11-22 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 1ba8724891 Update prost-build requirement from =0.12.2 to =0.12.3 
(#8298)
1ba8724891 is described below

commit 1ba87248912254bba073ecf6c65eaaf4845e9285
Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
AuthorDate: Wed Nov 22 10:53:34 2023 +0100

Update prost-build requirement from =0.12.2 to =0.12.3 (#8298)

Updates the requirements on 
[prost-build](https://github.com/tokio-rs/prost) to permit the latest version.
- [Release notes](https://github.com/tokio-rs/prost/releases)
- [Commits](https://github.com/tokio-rs/prost/compare/v0.12.2...v0.12.3)

---
updated-dependencies:
- dependency-name: prost-build
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] 
Co-authored-by: dependabot[bot] 
<49699333+dependabot[bot]@users.noreply.github.com>
---
 datafusion/proto/gen/Cargo.toml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/datafusion/proto/gen/Cargo.toml b/datafusion/proto/gen/Cargo.toml
index f58357c6c5..8b3f3f98a8 100644
--- a/datafusion/proto/gen/Cargo.toml
+++ b/datafusion/proto/gen/Cargo.toml
@@ -32,4 +32,4 @@ publish = false
 [dependencies]
 # Pin these dependencies so that the generated output is deterministic
 pbjson-build = "=0.6.2"
-prost-build = "=0.12.2"
+prost-build = "=0.12.3"



(arrow-datafusion) branch main updated: [Benchmarks] Make `partitions` default to number of cores instead of 2 (#8292)

2023-11-21 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new afdabb260a [Benchmarks] Make `partitions` default to number of cores 
instead of 2 (#8292)
afdabb260a is described below

commit afdabb260a32e1d3e2119b48b93e47d851cf765f
Author: Andy Grove 
AuthorDate: Wed Nov 22 00:24:42 2023 -0700

[Benchmarks] Make `partitions` default to number of cores instead of 2 
(#8292)

* Default partitions to num cores

* update test
---
 benchmarks/src/sort.rs | 5 +++--
 benchmarks/src/tpch/run.rs | 6 +++---
 benchmarks/src/util/options.rs | 8 
 3 files changed, 10 insertions(+), 9 deletions(-)

diff --git a/benchmarks/src/sort.rs b/benchmarks/src/sort.rs
index 5643c85619..224f2b19c7 100644
--- a/benchmarks/src/sort.rs
+++ b/benchmarks/src/sort.rs
@@ -148,8 +148,9 @@ impl RunOpt {
 println!("Executing '{title}' (sorting by: {expr:?})");
 rundata.start_new_case(title);
 for i in 0..self.common.iterations {
-let config =
-
SessionConfig::new().with_target_partitions(self.common.partitions);
+let config = SessionConfig::new().with_target_partitions(
+self.common.partitions.unwrap_or(num_cpus::get()),
+);
 let ctx = SessionContext::new_with_config(config);
 let (rows, elapsed) =
 exec_sort(, , _file, 
self.common.debug).await?;
diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs
index 171b074d2a..5193d578fb 100644
--- a/benchmarks/src/tpch/run.rs
+++ b/benchmarks/src/tpch/run.rs
@@ -285,7 +285,7 @@ impl RunOpt {
 }
 
 fn partitions() -> usize {
-self.common.partitions
+self.common.partitions.unwrap_or(num_cpus::get())
 }
 }
 
@@ -325,7 +325,7 @@ mod tests {
 let path = get_tpch_data_path()?;
 let common = CommonOpt {
 iterations: 1,
-partitions: 2,
+partitions: Some(2),
 batch_size: 8192,
 debug: false,
 };
@@ -357,7 +357,7 @@ mod tests {
 let path = get_tpch_data_path()?;
 let common = CommonOpt {
 iterations: 1,
-partitions: 2,
+partitions: Some(2),
 batch_size: 8192,
 debug: false,
 };
diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs
index 1d86d10fb8..b9398e5b52 100644
--- a/benchmarks/src/util/options.rs
+++ b/benchmarks/src/util/options.rs
@@ -26,9 +26,9 @@ pub struct CommonOpt {
 #[structopt(short = "i", long = "iterations", default_value = "3")]
 pub iterations: usize,
 
-/// Number of partitions to process in parallel
-#[structopt(short = "n", long = "partitions", default_value = "2")]
-pub partitions: usize,
+/// Number of partitions to process in parallel. Defaults to number of 
available cores.
+#[structopt(short = "n", long = "partitions")]
+pub partitions: Option,
 
 /// Batch size when reading CSV or Parquet files
 #[structopt(short = "s", long = "batch-size", default_value = "8192")]
@@ -48,7 +48,7 @@ impl CommonOpt {
 /// Modify the existing config appropriately
 pub fn update_config(, config: SessionConfig) -> SessionConfig {
 config
-.with_target_partitions(self.partitions)
+.with_target_partitions(self.partitions.unwrap_or(num_cpus::get()))
 .with_batch_size(self.batch_size)
 }
 }



(arrow-datafusion) branch main updated: Minor: clean up the code based on Clippy (#8257)

2023-11-18 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 2e3f4344be Minor: clean up the code based on Clippy (#8257)
2e3f4344be is described below

commit 2e3f4344be4b520a1c58ab82b4171303b6826b65
Author: Alex Huang 
AuthorDate: Sat Nov 18 11:01:42 2023 +0100

Minor: clean up the code based on Clippy (#8257)
---
 datafusion/physical-expr/src/array_expressions.rs | 4 ++--
 datafusion/physical-plan/src/filter.rs| 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/datafusion/physical-expr/src/array_expressions.rs 
b/datafusion/physical-expr/src/array_expressions.rs
index 8bb70c3168..e2d22a0d33 100644
--- a/datafusion/physical-expr/src/array_expressions.rs
+++ b/datafusion/physical-expr/src/array_expressions.rs
@@ -582,14 +582,14 @@ pub fn array_except(args: &[ArrayRef]) -> 
Result {
 match (array1.data_type(), array2.data_type()) {
 (DataType::Null, _) | (_, DataType::Null) => Ok(array1.to_owned()),
 (DataType::List(field), DataType::List(_)) => {
-check_datatypes("array_except", &[, ])?;
+check_datatypes("array_except", &[array1, array2])?;
 let list1 = array1.as_list::();
 let list2 = array2.as_list::();
 let result = general_except::(list1, list2, field)?;
 Ok(Arc::new(result))
 }
 (DataType::LargeList(field), DataType::LargeList(_)) => {
-check_datatypes("array_except", &[, ])?;
+check_datatypes("array_except", &[array1, array2])?;
 let list1 = array1.as_list::();
 let list2 = array2.as_list::();
 let result = general_except::(list1, list2, field)?;
diff --git a/datafusion/physical-plan/src/filter.rs 
b/datafusion/physical-plan/src/filter.rs
index 107c95eff7..b6cd9fe79c 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -201,7 +201,7 @@ impl ExecutionPlan for FilterExec {
 // tracking issue for making this configurable:
 // https://github.com/apache/arrow-datafusion/issues/8133
 let selectivity = 0.2_f64;
-let mut stats = input_stats.clone().into_inexact();
+let mut stats = input_stats.into_inexact();
 stats.num_rows = 
stats.num_rows.with_estimated_selectivity(selectivity);
 stats.total_byte_size = stats
 .total_byte_size



(arrow-ballista) branch df33 updated (9fd4f0d2 -> 7fbd8d76)

2023-11-16 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch df33
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


from 9fd4f0d2 Tests
 add 7fbd8d76 Tests

No new revisions were added by this update.

Summary of changes:
 ballista/core/src/execution_plans/shuffle_reader.rs | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)



(arrow-ballista) branch df33 updated (c8a6427f -> 9fd4f0d2)

2023-11-16 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch df33
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


from c8a6427f Upgrade crates
 add 9fd4f0d2 Tests

No new revisions were added by this update.

Summary of changes:
 ballista/client/src/context.rs | 34 +-
 1 file changed, 17 insertions(+), 17 deletions(-)



(arrow-ballista) branch df33 updated (8d0ddc2f -> c8a6427f)

2023-11-16 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch df33
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


from 8d0ddc2f Fix
 add c8a6427f Upgrade crates

No new revisions were added by this update.

Summary of changes:
 ballista/core/Cargo.toml | 2 +-
 ballista/scheduler/Cargo.toml| 6 +++---
 ballista/scheduler/src/flight_sql.rs | 5 +++--
 3 files changed, 7 insertions(+), 6 deletions(-)



(arrow-ballista) branch df33 updated (3ed6b705 -> 8d0ddc2f)

2023-11-16 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch df33
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


from 3ed6b705 Test fixes
 add 8d0ddc2f Fix

No new revisions were added by this update.

Summary of changes:
 ballista/executor/src/executor.rs | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



(arrow-ballista) branch df33 updated (5072a9bf -> 3ed6b705)

2023-11-16 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch df33
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


from 5072a9bf Update to DataFusion 33
 add 3ed6b705 Test fixes

No new revisions were added by this update.

Summary of changes:
 .../core/src/execution_plans/shuffle_reader.rs | 25 ++
 1 file changed, 11 insertions(+), 14 deletions(-)



(arrow-ballista) branch df33 created (now 5072a9bf)

2023-11-16 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch df33
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


  at 5072a9bf Update to DataFusion 33

This branch includes the following new commits:

 new 5072a9bf Update to DataFusion 33

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.




(arrow-ballista) 01/01: Update to DataFusion 33

2023-11-16 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch df33
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git

commit 5072a9bf24d65ea0ce9b48163e36233b3261298f
Author: Daniël Heres 
AuthorDate: Thu Nov 16 20:23:59 2023 +0100

Update to DataFusion 33
---
 Cargo.toml | 14 +++
 ballista/client/src/columnar_batch.rs  |  8 ++--
 .../core/src/execution_plans/distributed_query.rs  |  4 +-
 .../core/src/execution_plans/shuffle_reader.rs | 43 --
 .../core/src/execution_plans/shuffle_writer.rs |  2 +-
 .../core/src/execution_plans/unresolved_shuffle.rs |  4 +-
 ballista/executor/src/collect.rs   |  2 +-
 ballista/executor/src/executor.rs  |  4 +-
 8 files changed, 43 insertions(+), 38 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 6f34fa06..e4b5f324 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -29,16 +29,16 @@ members = [
 resolver = "2"
 
 [workspace.dependencies]
-arrow = { version = "47.0.0" }
-arrow-flight = { version = "47.0.0", features = ["flight-sql-experimental"] }
-arrow-schema = { version = "47.0.0", default-features = false }
+arrow = { version = "48.0.0" }
+arrow-flight = { version = "48.0.0", features = ["flight-sql-experimental"] }
+arrow-schema = { version = "48.0.0", default-features = false }
 configure_me = { version = "0.4.0" }
 configure_me_codegen = { version = "0.4.4" }
-datafusion = "32.0.0"
-datafusion-cli = "32.0.0"
-datafusion-proto = "32.0.0"
+datafusion = "33.0.0"
+datafusion-cli = "33.0.0"
+datafusion-proto = "33.0.0"
 object_store = "0.7.0"
-sqlparser = "0.38.0"
+sqlparser = "0.39.0"
 tonic = { version = "0.10" }
 tonic-build = { version = "0.10", default-features = false, features = [
 "transport",
diff --git a/ballista/client/src/columnar_batch.rs 
b/ballista/client/src/columnar_batch.rs
index 3431f561..5e7fe89b 100644
--- a/ballista/client/src/columnar_batch.rs
+++ b/ballista/client/src/columnar_batch.rs
@@ -147,10 +147,12 @@ impl ColumnarValue {
 }
 }
 
-pub fn to_arrow() -> ArrayRef {
+pub fn to_arrow() -> Result {
 match self {
-ColumnarValue::Columnar(array) => array.clone(),
-ColumnarValue::Scalar(value, n) => value.to_array_of_size(*n),
+ColumnarValue::Columnar(array) => Ok(array.clone()),
+ColumnarValue::Scalar(value, n) => {
+value.to_array_of_size(*n).map_err(|x| x.into())
+}
 }
 }
 
diff --git a/ballista/core/src/execution_plans/distributed_query.rs 
b/ballista/core/src/execution_plans/distributed_query.rs
index ccb26206..13511173 100644
--- a/ballista/core/src/execution_plans/distributed_query.rs
+++ b/ballista/core/src/execution_plans/distributed_query.rs
@@ -210,11 +210,11 @@ impl ExecutionPlan for 
DistributedQueryExec {
 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
 }
 
-fn statistics() -> Statistics {
+fn statistics() -> Result {
 // This execution plan sends the logical plan to the scheduler without
 // performing the node by node conversion to a full physical plan.
 // This implies that we cannot infer the statistics at this stage.
-Statistics::default()
+Ok(Statistics::new_unknown(()))
 }
 }
 
diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs 
b/ballista/core/src/execution_plans/shuffle_reader.rs
index fa3f9f69..a18e1aef 100644
--- a/ballista/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/core/src/execution_plans/shuffle_reader.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use async_trait::async_trait;
+use datafusion::common::stats::Precision;
 use std::any::Any;
 use std::collections::HashMap;
 use std::fmt::Debug;
@@ -37,8 +38,8 @@ use datafusion::error::Result;
 use datafusion::physical_plan::expressions::PhysicalSortExpr;
 use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
 use datafusion::physical_plan::{
-DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, 
RecordBatchStream,
-SendableRecordBatchStream, Statistics,
+ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, 
Partitioning,
+RecordBatchStream, SendableRecordBatchStream, Statistics,
 };
 use futures::{Stream, StreamExt, TryStreamExt};
 
@@ -172,37 +173,39 @@ impl ExecutionPlan for ShuffleReaderExec {
 Some(self.metrics.clone_inner())
 }
 
-fn statistics() -> Statistics {
-stats_for_partitions(
+fn statistics() -> Result {
+Ok(stats_for_partitions(
+self.schema.fields().len(),
 self.partition
  

(arrow-ballista) branch main updated: Upgrade datafusion to 32.0.0 (#899)

2023-11-16 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/main by this push:
 new c0561ed6 Upgrade datafusion to 32.0.0 (#899)
c0561ed6 is described below

commit c0561ed69a3cd0987bc109f21027370caac54d8b
Author: r.4ntix 
AuthorDate: Thu Nov 16 11:13:43 2023 -0600

Upgrade datafusion to 32.0.0 (#899)

* Upgrade datafusion to 32.0.0

* chore: make cargo fmt happy
---
 Cargo.toml | 18 
 ballista-cli/src/main.rs   |  2 +
 ballista/client/src/context.rs |  2 +-
 ballista/core/Cargo.toml   |  4 +-
 ballista/core/src/serde/generated/ballista.rs  | 51 ++
 ballista/core/src/serde/scheduler/to_proto.rs  | 10 -
 ballista/core/src/utils.rs |  6 +--
 ballista/executor/src/executor_process.rs  |  1 +
 ballista/executor/src/executor_server.rs   |  4 +-
 ballista/scheduler/Cargo.toml  |  6 +--
 ballista/scheduler/src/flight_sql.rs   | 12 ++---
 ballista/scheduler/src/state/execution_graph.rs|  9 ++--
 .../scheduler/src/state/execution_graph_dot.rs |  4 +-
 ballista/scheduler/src/state/executor_manager.rs   |  4 +-
 ballista/scheduler/src/state/session_manager.rs|  2 +-
 ballista/scheduler/src/state/task_manager.rs   |  3 +-
 ballista/scheduler/src/test_utils.rs   | 14 +++---
 benchmarks/src/bin/nyctaxi.rs  |  2 +-
 benchmarks/src/bin/tpch.rs | 10 ++---
 examples/Cargo.toml|  6 +--
 20 files changed, 95 insertions(+), 75 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index f8e46de5..6f34fa06 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -29,18 +29,18 @@ members = [
 resolver = "2"
 
 [workspace.dependencies]
-arrow = { version = "46.0.0" }
-arrow-flight = { version = "46.0.0", features = ["flight-sql-experimental"] }
-arrow-schema = { version = "46.0.0", default-features = false }
+arrow = { version = "47.0.0" }
+arrow-flight = { version = "47.0.0", features = ["flight-sql-experimental"] }
+arrow-schema = { version = "47.0.0", default-features = false }
 configure_me = { version = "0.4.0" }
 configure_me_codegen = { version = "0.4.4" }
-datafusion = "31.0.0"
-datafusion-cli = "31.0.0"
-datafusion-proto = "31.0.0"
+datafusion = "32.0.0"
+datafusion-cli = "32.0.0"
+datafusion-proto = "32.0.0"
 object_store = "0.7.0"
-sqlparser = "0.37.0"
-tonic = { version = "0.9" }
-tonic-build = { version = "0.9", default-features = false, features = [
+sqlparser = "0.38.0"
+tonic = { version = "0.10" }
+tonic-build = { version = "0.10", default-features = false, features = [
 "transport",
 "prost"
 ] }
diff --git a/ballista-cli/src/main.rs b/ballista-cli/src/main.rs
index 3f8f9ba8..67750429 100644
--- a/ballista-cli/src/main.rs
+++ b/ballista-cli/src/main.rs
@@ -23,6 +23,7 @@ use ballista_cli::{
 exec, print_format::PrintFormat, print_options::PrintOptions, 
BALLISTA_CLI_VERSION,
 };
 use clap::Parser;
+use datafusion_cli::print_options::MaxRows;
 use mimalloc::MiMalloc;
 
 #[global_allocator]
@@ -133,6 +134,7 @@ pub async fn main() -> Result<()> {
 let mut print_options = PrintOptions {
 format: args.format,
 quiet: args.quiet,
+maxrows: MaxRows::Unlimited,
 };
 
 let files = args.file;
diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs
index a0671acf..76c8d439 100644
--- a/ballista/client/src/context.rs
+++ b/ballista/client/src/context.rs
@@ -357,7 +357,7 @@ impl BallistaContext {
 // the show tables、 show columns sql can not run at scheduler because 
the tables is store at client
 if is_show {
 let state = self.state.lock();
-ctx = Arc::new(SessionContext::with_config(
+ctx = Arc::new(SessionContext::new_with_config(
 SessionConfig::new().with_information_schema(
 state.config.default_with_information_schema(),
 ),
diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index 40239ae4..31a5d495 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -66,8 +66,8 @@ once_cell = "1.9.0"
 
 parking_lot = "0.12"
 parse_arg = "0.1.3"
-prost = "0.11"
-prost-types = "0.11"
+prost = "0.12"
+prost-types = "0.12"
 rand = "0.8"
 serde = { version = "1&q

(arrow-datafusion) branch main updated (4c6f5c5310 -> 37eecfe6ef)

2023-11-16 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


from 4c6f5c5310 Fix column indices in the planning tests (#8191)
 add 37eecfe6ef Remove unnecessary reassignment (#8232)

No new revisions were added by this update.

Summary of changes:
 datafusion/core/src/physical_planner.rs | 9 +++--
 1 file changed, 3 insertions(+), 6 deletions(-)



(arrow-datafusion) branch main updated: Minor: Encapsulate `LeftJoinData` into a struct (rather than anonymous enum) and add comments (#8153)

2023-11-14 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new fcd17c85c2 Minor: Encapsulate `LeftJoinData` into a struct (rather 
than anonymous enum) and add comments (#8153)
fcd17c85c2 is described below

commit fcd17c85c2eba1c5c8d92beb52f4351286f2dcea
Author: Andrew Lamb 
AuthorDate: Tue Nov 14 03:12:04 2023 -0500

Minor: Encapsulate `LeftJoinData` into a struct (rather than anonymous 
enum) and add comments (#8153)

* Minor: Encapsulate LeftJoinData into a struct (rather than anonymous enum)

* clippy
---
 datafusion/physical-plan/src/joins/hash_join.rs| 72 --
 .../physical-plan/src/joins/hash_join_utils.rs |  9 ++-
 2 files changed, 61 insertions(+), 20 deletions(-)

diff --git a/datafusion/physical-plan/src/joins/hash_join.rs 
b/datafusion/physical-plan/src/joins/hash_join.rs
index 546a929bf9..da57fa07cc 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -73,7 +73,47 @@ use datafusion_physical_expr::EquivalenceProperties;
 use ahash::RandomState;
 use futures::{ready, Stream, StreamExt, TryStreamExt};
 
-type JoinLeftData = (JoinHashMap, RecordBatch, MemoryReservation);
+/// HashTable and input data for the left (build side) of a join
+struct JoinLeftData {
+/// The hash table with indices into `batch`
+hash_map: JoinHashMap,
+/// The input rows for the build side
+batch: RecordBatch,
+/// Memory reservation that tracks memory used by `hash_map` hash table
+/// `batch`. Cleared on drop.
+#[allow(dead_code)]
+reservation: MemoryReservation,
+}
+
+impl JoinLeftData {
+/// Create a new `JoinLeftData` from its parts
+fn new(
+hash_map: JoinHashMap,
+batch: RecordBatch,
+reservation: MemoryReservation,
+) -> Self {
+Self {
+hash_map,
+batch,
+reservation,
+}
+}
+
+/// Returns the number of rows in the build side
+fn num_rows() -> usize {
+self.batch.num_rows()
+}
+
+/// return a reference to the hash map
+fn hash_map() ->  {
+_map
+}
+
+/// returns a reference to the build side batch
+fn batch() ->  {
+
+}
+}
 
 /// Join execution plan: Evaluates eqijoin predicates in parallel on multiple
 /// partitions using a hash table and an optional filter list to apply post
@@ -692,8 +732,9 @@ async fn collect_left_input(
 // Merge all batches into a single batch, so we
 // can directly index into the arrays
 let single_batch = concat_batches(, , num_rows)?;
+let data = JoinLeftData::new(hashmap, single_batch, reservation);
 
-Ok((hashmap, single_batch, reservation))
+Ok(data)
 }
 
 /// Updates `hash` with new entries from [RecordBatch] evaluated against the 
expressions `on`,
@@ -770,7 +811,7 @@ struct HashJoinStream {
 left_fut: OnceFut,
 /// Which left (probe) side rows have been matches while creating output.
 /// For some OUTER joins, we need to know which rows have not been matched
-/// to produce the correct.
+/// to produce the correct output.
 visited_left_side: Option,
 /// right (probe) input
 right: SendableRecordBatchStream,
@@ -1042,13 +1083,13 @@ impl HashJoinStream {
 {
 // TODO: Replace `ceil` wrapper with stable `div_cell` after
 // https://github.com/rust-lang/rust/issues/88581
-let visited_bitmap_size = bit_util::ceil(left_data.1.num_rows(), 
8);
+let visited_bitmap_size = bit_util::ceil(left_data.num_rows(), 8);
 self.reservation.try_grow(visited_bitmap_size)?;
 self.join_metrics.build_mem_used.add(visited_bitmap_size);
 }
 
 let visited_left_side = self.visited_left_side.get_or_insert_with(|| {
-let num_rows = left_data.1.num_rows();
+let num_rows = left_data.num_rows();
 if need_produce_result_in_final(self.join_type) {
 // Some join types need to track which row has be matched or 
unmatched:
 // `left semi` join:  need to use the bitmap to produce the 
matched row in the left side
@@ -1075,8 +1116,8 @@ impl HashJoinStream {
 
 // get the matched two indices for the on condition
 let left_right_indices = 
build_equal_condition_join_indices(
-_data.0,
-_data.1,
+left_data.hash_map(),
+left_data.batch(),
 ,
 _left,
 _right,
@@ -1108,7 +1149,7 @@ impl HashJoinStream {
 
 let result = build_batch_

(arrow-datafusion) branch main updated: Fix join order for TPCH Q17 & Q18 by improving FilterExec statistics (#8126)

2023-11-12 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 6fe00ce2e3 Fix join order for TPCH Q17 & Q18 by improving FilterExec 
statistics (#8126)
6fe00ce2e3 is described below

commit 6fe00ce2e30d2af2b64d3a97b877a96109c215f9
Author: Andy Grove 
AuthorDate: Sun Nov 12 01:41:15 2023 -0700

Fix join order for TPCH Q17 & Q18 by improving FilterExec statistics (#8126)

* Assume filters are highly selective if we cannot truly estimate 
cardinality

* fix regression

* cargo fmt

* simplify code

* Update datafusion/physical-plan/src/filter.rs

Co-authored-by: Daniël Heres 

* add comment with link to follow on issue

* Use default of 20% selectivity

* trigger CI

* remove files

* trigger CI

* address feedback

-

Co-authored-by: Daniël Heres 
---
 datafusion/physical-plan/src/filter.rs | 16 ++--
 1 file changed, 14 insertions(+), 2 deletions(-)

diff --git a/datafusion/physical-plan/src/filter.rs 
b/datafusion/physical-plan/src/filter.rs
index d560a219f2..822ddfdf3e 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -194,11 +194,23 @@ impl ExecutionPlan for FilterExec {
 fn statistics() -> Result {
 let predicate = self.predicate();
 
+let input_stats = self.input.statistics()?;
 let schema = self.schema();
 if !check_support(predicate, ) {
-return Ok(Statistics::new_unknown());
+// assume filter selects 20% of rows if we cannot do anything 
smarter
+// tracking issue for making this configurable:
+// https://github.com/apache/arrow-datafusion/issues/8133
+let selectivity = 0.2_f32;
+let mut stats = input_stats.clone().into_inexact();
+if let Precision::Inexact(n) = stats.num_rows {
+stats.num_rows = Precision::Inexact((selectivity * n as f32) 
as usize);
+}
+if let Precision::Inexact(n) = stats.total_byte_size {
+stats.total_byte_size =
+Precision::Inexact((selectivity * n as f32) as usize);
+}
+return Ok(stats);
 }
-let input_stats = self.input.statistics()?;
 
 let num_rows = input_stats.num_rows;
 let total_byte_size = input_stats.total_byte_size;



(arrow-datafusion) branch main updated: Update sqllogictest requirement from 0.17.0 to 0.18.0 (#8102)

2023-11-09 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 4512805c20 Update sqllogictest requirement from 0.17.0 to 0.18.0 
(#8102)
4512805c20 is described below

commit 4512805c2087d1a5538afdaba9d2e2ca5347c90c
Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
AuthorDate: Thu Nov 9 11:01:43 2023 +0100

Update sqllogictest requirement from 0.17.0 to 0.18.0 (#8102)

Updates the requirements on 
[sqllogictest](https://github.com/risinglightdb/sqllogictest-rs) to permit the 
latest version.
- [Release notes](https://github.com/risinglightdb/sqllogictest-rs/releases)
- 
[Changelog](https://github.com/risinglightdb/sqllogictest-rs/blob/main/CHANGELOG.md)
- 
[Commits](https://github.com/risinglightdb/sqllogictest-rs/compare/v0.17.0...v0.18.0)

---
updated-dependencies:
- dependency-name: sqllogictest
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] 
Co-authored-by: dependabot[bot] 
<49699333+dependabot[bot]@users.noreply.github.com>
---
 datafusion/sqllogictest/Cargo.toml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/datafusion/sqllogictest/Cargo.toml 
b/datafusion/sqllogictest/Cargo.toml
index d27e88274f..4caec0e84b 100644
--- a/datafusion/sqllogictest/Cargo.toml
+++ b/datafusion/sqllogictest/Cargo.toml
@@ -46,7 +46,7 @@ object_store = { workspace = true }
 postgres-protocol = { version = "0.6.4", optional = true }
 postgres-types = { version = "0.2.4", optional = true }
 rust_decimal = { version = "1.27.0" }
-sqllogictest = "0.17.0"
+sqllogictest = "0.18.0"
 sqlparser = { workspace = true }
 tempfile = { workspace = true }
 thiserror = { workspace = true }



(arrow-datafusion) branch main updated (661d211a62 -> b2a1668183)

2023-11-03 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


from 661d211a62 Minor: Avoid recomputing compute_array_ndims in 
align_array_dimensions (#7963)
 add b2a1668183 Minor: fix doc check (#8037)

No new revisions were added by this update.

Summary of changes:
 datafusion/core/src/datasource/file_format/arrow.rs | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)



(arrow-datafusion) branch main updated: doc: minor fix to SortExec::with_fetch comment (#8011)

2023-11-01 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new aef95edf9b doc: minor fix to SortExec::with_fetch comment (#8011)
aef95edf9b is described below

commit aef95edf9bf1324be8642146c882b8d4c89a3785
Author: Weston Pace 
AuthorDate: Wed Nov 1 02:05:50 2023 -0700

doc: minor fix to SortExec::with_fetch comment (#8011)
---
 datafusion/physical-plan/src/sorts/sort.rs | 8 +++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
index c7d676493f..08fa2c25d7 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -735,7 +735,13 @@ impl SortExec {
 self
 }
 
-/// Whether this `SortExec` preserves partitioning of the children
+/// Modify how many rows to include in the result
+///
+/// If None, then all rows will be returned, in sorted order.
+/// If Some, then only the top `fetch` rows will be returned.
+/// This can reduce the memory pressure required by the sort
+/// operation since rows that are not going to be included
+/// can be dropped.
 pub fn with_fetch(mut self, fetch: Option) -> Self {
 self.fetch = fetch;
 self



(arrow-datafusion) branch main updated: Create temporary files for reading or writing (#8005)

2023-11-01 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 3185783146 Create temporary files for reading or writing (#8005)
3185783146 is described below

commit 318578314676ef09f5a6b3408c7a91414344642a
Author: jokercurry <982458...@qq.com>
AuthorDate: Wed Nov 1 17:05:37 2023 +0800

Create temporary files for reading or writing (#8005)

* Create temporary files for reading or writing

* nit

* addr comment

-

Co-authored-by: zhongjingxiong 
---
 datafusion-examples/examples/dataframe.rs | 36 ++-
 1 file changed, 21 insertions(+), 15 deletions(-)

diff --git a/datafusion-examples/examples/dataframe.rs 
b/datafusion-examples/examples/dataframe.rs
index 26fddcd226..ea01c53b1c 100644
--- a/datafusion-examples/examples/dataframe.rs
+++ b/datafusion-examples/examples/dataframe.rs
@@ -18,7 +18,9 @@
 use datafusion::arrow::datatypes::{DataType, Field, Schema};
 use datafusion::error::Result;
 use datafusion::prelude::*;
-use std::fs;
+use std::fs::File;
+use std::io::Write;
+use tempfile::tempdir;
 
 /// This example demonstrates executing a simple query against an Arrow data 
source (Parquet) and
 /// fetching results, using the DataFrame trait
@@ -41,12 +43,19 @@ async fn main() -> Result<()> {
 // print the results
 df.show().await?;
 
+// create a csv file waiting to be written
+let dir = tempdir()?;
+let file_path = dir.path().join("example.csv");
+let file = File::create(_path)?;
+write_csv_file(file);
+
 // Reading CSV file with inferred schema example
-let csv_df = example_read_csv_file_with_inferred_schema().await;
+let csv_df =
+
example_read_csv_file_with_inferred_schema(file_path.to_str().unwrap()).await;
 csv_df.show().await?;
 
 // Reading CSV file with defined schema
-let csv_df = example_read_csv_file_with_schema().await;
+let csv_df = 
example_read_csv_file_with_schema(file_path.to_str().unwrap()).await;
 csv_df.show().await?;
 
 // Reading PARQUET file and print describe
@@ -59,31 +68,28 @@ async fn main() -> Result<()> {
 }
 
 // Function to create an test CSV file
-fn create_csv_file(path: String) {
+fn write_csv_file(mut file: File) {
 // Create the data to put into the csv file with headers
 let content = r#"id,time,vote,unixtime,rating
 a1,"10 6, 2013",3,1381017600,5.0
 a2,"08 9, 2013",2,1376006400,4.5"#;
 // write the data
-fs::write(path, content).expect("Problem with writing file!");
+file.write_all(content.as_ref())
+.expect("Problem with writing file!");
 }
 
 // Example to read data from a csv file with inferred schema
-async fn example_read_csv_file_with_inferred_schema() -> DataFrame {
-let path = "example.csv";
-// Create a csv file using the predefined function
-create_csv_file(path.to_string());
+async fn example_read_csv_file_with_inferred_schema(file_path: ) -> 
DataFrame {
 // Create a session context
 let ctx = SessionContext::new();
 // Register a lazy DataFrame using the context
-ctx.read_csv(path, CsvReadOptions::default()).await.unwrap()
+ctx.read_csv(file_path, CsvReadOptions::default())
+.await
+.unwrap()
 }
 
 // Example to read csv file with a defined schema for the csv file
-async fn example_read_csv_file_with_schema() -> DataFrame {
-let path = "example.csv";
-// Create a csv file using the predefined function
-create_csv_file(path.to_string());
+async fn example_read_csv_file_with_schema(file_path: ) -> DataFrame {
 // Create a session context
 let ctx = SessionContext::new();
 // Define the schema
@@ -101,5 +107,5 @@ async fn example_read_csv_file_with_schema() -> DataFrame {
 ..Default::default()
 };
 // Register a lazy DataFrame by using the context and option provider
-ctx.read_csv(path, csv_read_option).await.unwrap()
+ctx.read_csv(file_path, csv_read_option).await.unwrap()
 }



(arrow-datafusion) branch main updated: Fix crate READMEs for core, execution, physical-plan (#7990)

2023-10-31 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new d8e413c0b9 Fix crate READMEs for core, execution, physical-plan (#7990)
d8e413c0b9 is described below

commit d8e413c0b92b86593ed9801a034bd62bdb9ddc0b
Author: Jeffrey <22608443+jefff...@users.noreply.github.com>
AuthorDate: Tue Oct 31 19:11:42 2023 +1100

Fix crate READMEs for core, execution, physical-plan (#7990)
---
 datafusion/core/Cargo.toml  | 2 +-
 datafusion/execution/Cargo.toml | 2 +-
 datafusion/physical-plan/Cargo.toml | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 5f9d28bd62..a961493467 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -20,9 +20,9 @@ name = "datafusion"
 description = "DataFusion is an in-memory query engine that uses Apache Arrow 
as the memory model"
 keywords = ["arrow", "query", "sql"]
 include = ["benches/*.rs", "src/**/*.rs", "Cargo.toml"]
+readme = "README.md"
 version = { workspace = true }
 edition = { workspace = true }
-readme = { workspace = true }
 homepage = { workspace = true }
 repository = { workspace = true }
 license = { workspace = true }
diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml
index 6ae8bccdae..54a2a6d8f3 100644
--- a/datafusion/execution/Cargo.toml
+++ b/datafusion/execution/Cargo.toml
@@ -19,9 +19,9 @@
 name = "datafusion-execution"
 description = "Execution configuration support for DataFusion query engine"
 keywords = ["arrow", "query", "sql"]
+readme = "README.md"
 version = { workspace = true }
 edition = { workspace = true }
-readme = { workspace = true }
 homepage = { workspace = true }
 repository = { workspace = true }
 license = { workspace = true }
diff --git a/datafusion/physical-plan/Cargo.toml 
b/datafusion/physical-plan/Cargo.toml
index 2dfcf12e35..033d2aa187 100644
--- a/datafusion/physical-plan/Cargo.toml
+++ b/datafusion/physical-plan/Cargo.toml
@@ -19,9 +19,9 @@
 name = "datafusion-physical-plan"
 description = "Physical (ExecutionPlan) implementations for DataFusion query 
engine"
 keywords = ["arrow", "query", "sql"]
+readme = "README.md"
 version = { workspace = true }
 edition = { workspace = true }
-readme = { workspace = true }
 homepage = { workspace = true }
 repository = { workspace = true }
 license = { workspace = true }



(arrow-datafusion) branch main updated: chore: clean useless clone baesd on clippy (#7973)

2023-10-29 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 9b45967edc chore: clean useless clone baesd on clippy (#7973)
9b45967edc is described below

commit 9b45967edc6dba312ea223464dad3e66604d2095
Author: Alex Huang 
AuthorDate: Sun Oct 29 22:45:34 2023 +0100

chore: clean useless clone baesd on clippy (#7973)
---
 datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs | 2 +-
 datafusion/core/src/physical_optimizer/enforce_distribution.rs  | 2 +-
 datafusion/core/src/physical_optimizer/topk_aggregation.rs  | 2 +-
 datafusion/physical-expr/src/array_expressions.rs   | 2 +-
 datafusion/substrait/src/logical_plan/producer.rs   | 2 +-
 5 files changed, 5 insertions(+), 5 deletions(-)

diff --git 
a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs 
b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
index 838ae61368..2c4e929788 100644
--- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
+++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
@@ -93,7 +93,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
 
input_agg_exec.filter_expr().to_vec(),
 
input_agg_exec.order_by_expr().to_vec(),
 input_agg_exec.input().clone(),
-
input_agg_exec.input_schema().clone(),
+input_agg_exec.input_schema(),
 )
 .ok()
 .map(Arc::new)
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 7b91dce32a..12df9efbbc 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -554,7 +554,7 @@ fn reorder_aggregate_keys(
 agg_exec.filter_expr().to_vec(),
 agg_exec.order_by_expr().to_vec(),
 partial_agg,
-agg_exec.input_schema().clone(),
+agg_exec.input_schema(),
 )?);
 
 // Need to create a new projection to change the expr 
ordering back
diff --git a/datafusion/core/src/physical_optimizer/topk_aggregation.rs 
b/datafusion/core/src/physical_optimizer/topk_aggregation.rs
index 572e796a8b..e0a8da82e3 100644
--- a/datafusion/core/src/physical_optimizer/topk_aggregation.rs
+++ b/datafusion/core/src/physical_optimizer/topk_aggregation.rs
@@ -75,7 +75,7 @@ impl TopKAggregation {
 aggr.filter_expr().to_vec(),
 aggr.order_by_expr().to_vec(),
 aggr.input().clone(),
-aggr.input_schema().clone(),
+aggr.input_schema(),
 )
 .expect("Unable to copy Aggregate!")
 .with_limit(Some(limit));
diff --git a/datafusion/physical-expr/src/array_expressions.rs 
b/datafusion/physical-expr/src/array_expressions.rs
index 7077f8b598..84fd301b84 100644
--- a/datafusion/physical-expr/src/array_expressions.rs
+++ b/datafusion/physical-expr/src/array_expressions.rs
@@ -811,7 +811,7 @@ fn concat_internal(args: &[ArrayRef]) -> Result {
 }
 }
 // Assume all arrays have the same data type
-let data_type = list_arrays[0].value_type().clone();
+let data_type = list_arrays[0].value_type();
 let buffer = valid.finish();
 
 let elements = arrays
diff --git a/datafusion/substrait/src/logical_plan/producer.rs 
b/datafusion/substrait/src/logical_plan/producer.rs
index 757bddf9fe..e3c6f94d43 100644
--- a/datafusion/substrait/src/logical_plan/producer.rs
+++ b/datafusion/substrait/src/logical_plan/producer.rs
@@ -326,7 +326,7 @@ pub fn to_substrait_rel(
 left: Some(left),
 right: Some(right),
 r#type: join_type as i32,
-expression: join_expr.clone(),
+expression: join_expr,
 post_join_filter: None,
 advanced_extension: None,
 }))),



[arrow-datafusion] branch main updated: support scan empty projection (#7920)

2023-10-25 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new af2cda9281 support scan empty projection  (#7920)
af2cda9281 is described below

commit af2cda928140f85d369e26931756a0cbc127e9ec
Author: Huaijin 
AuthorDate: Wed Oct 25 15:56:13 2023 +0800

support scan empty projection  (#7920)

* fix

* modify push_down_filter for empty projection

* clean code

* fix ci

* fix

* Update 
datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs

Co-authored-by: Daniël Heres 

* remove LogicalPlan::Values in push_down_projection

-

Co-authored-by: Daniël Heres 
---
 .../datasource/physical_plan/file_scan_config.rs   |  10 +-
 .../provider_filter_pushdown.rs|  14 +-
 datafusion/core/tests/sql/explain_analyze.rs   |   2 +-
 datafusion/optimizer/src/push_down_projection.rs   | 168 +
 datafusion/physical-plan/src/joins/cross_join.rs   |   4 +-
 datafusion/sqllogictest/test_files/avro.slt|   4 +-
 datafusion/sqllogictest/test_files/json.slt|   4 +-
 datafusion/sqllogictest/test_files/subquery.slt|   6 +-
 8 files changed, 31 insertions(+), 181 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs 
b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
index d8a9697b2b..3efb0df9df 100644
--- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
@@ -36,7 +36,7 @@ use crate::{
 use arrow::array::{ArrayData, BufferBuilder};
 use arrow::buffer::Buffer;
 use arrow::datatypes::{ArrowNativeType, UInt16Type};
-use arrow_array::{ArrayRef, DictionaryArray, RecordBatch};
+use arrow_array::{ArrayRef, DictionaryArray, RecordBatch, RecordBatchOptions};
 use arrow_schema::{DataType, Field, Schema, SchemaRef};
 use datafusion_common::stats::Precision;
 use datafusion_common::{exec_err, ColumnStatistics, Statistics};
@@ -339,7 +339,13 @@ impl PartitionColumnProjector {
 ),
 )
 }
-RecordBatch::try_new(Arc::clone(_schema), 
cols).map_err(Into::into)
+
+RecordBatch::try_new_with_options(
+Arc::clone(_schema),
+cols,
+
::new().with_row_count(Some(file_batch.num_rows())),
+)
+.map_err(Into::into)
 }
 }
 
diff --git 
a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs 
b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
index 4679ca6d07..e374abd6e8 100644
--- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
+++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
@@ -149,10 +149,12 @@ impl TableProvider for CustomProvider {
 async fn scan(
 ,
 _state: ,
-_: Option<>,
+projection: Option<>,
 filters: &[Expr],
 _: Option,
 ) -> Result> {
+let empty = Vec::new();
+let projection = projection.unwrap_or();
 match [0] {
 Expr::BinaryExpr(BinaryExpr { right, .. }) => {
 let int_value = match &**right {
@@ -182,7 +184,10 @@ impl TableProvider for CustomProvider {
 };
 
 Ok(Arc::new(CustomPlan {
-schema: self.zero_batch.schema(),
+schema: match projection.is_empty() {
+true => Arc::new(Schema::empty()),
+false => self.zero_batch.schema(),
+},
 batches: match int_value {
 0 => vec![self.zero_batch.clone()],
 1 => vec![self.one_batch.clone()],
@@ -191,7 +196,10 @@ impl TableProvider for CustomProvider {
 }))
 }
 _ => Ok(Arc::new(CustomPlan {
-schema: self.zero_batch.schema(),
+schema: match projection.is_empty() {
+true => Arc::new(Schema::empty()),
+false => self.zero_batch.schema(),
+},
 batches: vec![],
 })),
 }
diff --git a/datafusion/core/tests/sql/explain_analyze.rs 
b/datafusion/core/tests/sql/explain_analyze.rs
index 7238369f83..2436e82f3c 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -788,7 +788,7 @@ async fn explain_logical_plan_only() {
 "logical_plan",
 "Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]\
 \n  SubqueryAlias: t\
-\nProjection: column2\
+\nProjection: \
  

[arrow-datafusion] branch main updated: Change `FileScanConfig.table_partition_cols` from `(String, DataType)` to `Field`s (#7890)

2023-10-22 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new ca5dc8c066 Change `FileScanConfig.table_partition_cols` from `(String, 
DataType)` to `Field`s (#7890)
ca5dc8c066 is described below

commit ca5dc8c066bb6da34a1f8522b6127138358a3159
Author: Nga Tran 
AuthorDate: Sun Oct 22 13:00:49 2023 -0400

Change `FileScanConfig.table_partition_cols` from `(String, DataType)` to 
`Field`s (#7890)

* feat: make data type of FileScanConfig.table_partition_cols a vector of 
Fields

* fix: avro test

* chore: Apply suggestions from code review

Co-authored-by: Andrew Lamb 

* chore: address review comments

* chore: remove uncessary to_owned

-

Co-authored-by: Andrew Lamb 
---
 datafusion/core/src/datasource/listing/table.rs| 10 +---
 .../core/src/datasource/physical_plan/avro.rs  |  2 +-
 .../core/src/datasource/physical_plan/csv.rs   |  2 +-
 .../datasource/physical_plan/file_scan_config.rs   | 54 ++
 .../src/datasource/physical_plan/file_stream.rs|  2 +-
 .../core/src/datasource/physical_plan/parquet.rs   |  9 ++--
 datafusion/proto/src/physical_plan/from_proto.rs   | 10 +---
 datafusion/proto/src/physical_plan/to_proto.rs |  2 +-
 8 files changed, 57 insertions(+), 34 deletions(-)

diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 485ab0a902..bd878932d8 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -746,15 +746,7 @@ impl TableProvider for ListingTable {
 .options
 .table_partition_cols
 .iter()
-.map(|col| {
-Ok((
-col.0.to_owned(),
-self.table_schema
-.field_with_name()?
-.data_type()
-.clone(),
-))
-})
+.map(|col| Ok(self.table_schema.field_with_name()?.clone()))
 .collect::>>()?;
 
 let filters = if let Some(expr) = conjunction(filters.to_vec()) {
diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs 
b/datafusion/core/src/datasource/physical_plan/avro.rs
index f08bc9b8df..237772eb83 100644
--- a/datafusion/core/src/datasource/physical_plan/avro.rs
+++ b/datafusion/core/src/datasource/physical_plan/avro.rs
@@ -420,7 +420,7 @@ mod tests {
 statistics: Statistics::new_unknown(_schema),
 file_schema,
 limit: None,
-table_partition_cols: vec![("date".to_owned(), DataType::Utf8)],
+table_partition_cols: vec![Field::new("date", DataType::Utf8, 
false)],
 output_ordering: vec![],
 infinite_source: false,
 });
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs 
b/datafusion/core/src/datasource/physical_plan/csv.rs
index f3b2fa9de7..e60a249b0b 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -871,7 +871,7 @@ mod tests {
 let mut config = partitioned_csv_config(file_schema, file_groups)?;
 
 // Add partition columns
-config.table_partition_cols = vec![("date".to_owned(), 
DataType::Utf8)];
+config.table_partition_cols = vec![Field::new("date", DataType::Utf8, 
false)];
 config.file_groups[0][0].partition_values =
 vec![ScalarValue::Utf8(Some("2021-10-26".to_owned()))];
 
diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs 
b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
index c1a19b745b..d8a9697b2b 100644
--- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
@@ -101,7 +101,7 @@ pub struct FileScanConfig {
 /// all records after filtering are returned.
 pub limit: Option,
 /// The partitioning columns
-pub table_partition_cols: Vec<(String, DataType)>,
+pub table_partition_cols: Vec,
 /// All equivalent lexicographical orderings that describe the schema.
 pub output_ordering: Vec,
 /// Indicates whether this plan may produce an infinite stream of records.
@@ -135,8 +135,7 @@ impl FileScanConfig {
 
table_cols_stats.push(self.statistics.column_statistics[idx].clone())
 } else {
 let partition_idx = idx - self.file_schema.fields().len();
-let (name, dtype) = _partition_cols[partition_idx];
-table_fields.push(Field::new(name, dtype.to_owned(), false));
+
table_fields.push(self.table_parti

[arrow-datafusion] branch main updated: Add multi-column topk fuzz tests (#7898)

2023-10-22 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new dae1efb00e Add multi-column topk fuzz tests (#7898)
dae1efb00e is described below

commit dae1efb00ee47a766261160e8087545551929801
Author: Andrew Lamb 
AuthorDate: Sun Oct 22 12:59:49 2023 -0400

Add multi-column topk fuzz tests (#7898)

* Add multi-column topk tests

* clippy

* fix validation

* Update docs
---
 datafusion/core/tests/fuzz_cases/limit_fuzz.rs | 349 +
 datafusion/core/tests/fuzz_cases/mod.rs|   2 +
 datafusion/core/tests/fuzz_cases/sort_fuzz.rs  | 221 +---
 test-utils/src/lib.rs  |   2 +-
 4 files changed, 355 insertions(+), 219 deletions(-)

diff --git a/datafusion/core/tests/fuzz_cases/limit_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/limit_fuzz.rs
new file mode 100644
index 00..9889ce2ae5
--- /dev/null
+++ b/datafusion/core/tests/fuzz_cases/limit_fuzz.rs
@@ -0,0 +1,349 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Fuzz Test for Sort + Fetch/Limit (TopK!)
+
+use arrow::compute::concat_batches;
+use arrow::util::pretty::pretty_format_batches;
+use arrow::{array::Int32Array, record_batch::RecordBatch};
+use arrow_array::{Float64Array, Int64Array, StringArray};
+use arrow_schema::SchemaRef;
+use datafusion::datasource::MemTable;
+use datafusion::prelude::SessionContext;
+use datafusion_common::assert_contains;
+use rand::{thread_rng, Rng};
+use std::sync::Arc;
+use test_utils::stagger_batch;
+
+#[tokio::test]
+async fn test_sort_topk_i32() {
+run_limit_fuzz_test(SortedData::new_i32).await
+}
+
+#[tokio::test]
+async fn test_sort_topk_f64() {
+run_limit_fuzz_test(SortedData::new_f64).await
+}
+
+#[tokio::test]
+async fn test_sort_topk_str() {
+run_limit_fuzz_test(SortedData::new_str).await
+}
+
+#[tokio::test]
+async fn test_sort_topk_i64str() {
+run_limit_fuzz_test(SortedData::new_i64str).await
+}
+
+/// Run TopK fuzz tests the specified input data with different
+/// different test functions so they can run in parallel)
+async fn run_limit_fuzz_test(make_data: F)
+where
+F: Fn(usize) -> SortedData,
+{
+let mut rng = thread_rng();
+for size in [10, 1_, 10_000, 100_000] {
+let data = make_data(size);
+// test various limits including some random ones
+for limit in [1, 3, 7, 17, 1, rng.gen_range(1..size * 2)] {
+//  limit can be larger than the number of rows in the input
+run_limit_test(limit, ).await;
+}
+}
+}
+
+/// The data column(s) to use for the TopK test
+///
+/// Each variants stores the input batches and the expected sorted values
+/// compute the expected output for a given fetch (limit) value.
+#[derive(Debug)]
+enum SortedData {
+// single Int32 column
+I32 {
+batches: Vec,
+sorted: Vec>,
+},
+/// Single Float64 column
+F64 {
+batches: Vec,
+sorted: Vec>,
+},
+/// Single sorted String column
+Str {
+batches: Vec,
+sorted: Vec>,
+},
+/// (i64, string) columns
+I64Str {
+batches: Vec,
+sorted: Vec<(Option, Option)>,
+},
+}
+
+impl SortedData {
+/// Create an i32 column of random values, with the specified number of
+/// rows, sorted the default
+fn new_i32(size: usize) -> Self {
+let mut rng = thread_rng();
+// have some repeats (approximately 1/3 of the values are the same)
+let max = size as i32 / 3;
+let data: Vec> = (0..size)
+.map(|_| {
+// no nulls for now
+Some(rng.gen_range(0..max))
+})
+.collect();
+
+let batches = stagger_batch(int32_batch(data.iter().cloned()));
+
+let mut sorted = data;
+sorted.sort_unstable();
+
+Self::I32 { batches, sorted }
+}
+
+/// Create an f64 column of random values, with the specified number of
+/// rows, so

[arrow-datafusion] branch main updated: Drop single quotes to make warnings for parquet options not confusing (#7902)

2023-10-22 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new f10036eb4e Drop single quotes to make warnings for parquet options not 
confusing (#7902)
f10036eb4e is described below

commit f10036eb4e34daba72b194cc32a1b55d7b5bc120
Author: Kirill Zaborsky 
AuthorDate: Sun Oct 22 15:57:01 2023 +0300

Drop single quotes to make warnings for parquet options not confusing 
(#7902)

Fixes #7867
---
 datafusion/common/src/file_options/parquet_writer.rs | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/datafusion/common/src/file_options/parquet_writer.rs 
b/datafusion/common/src/file_options/parquet_writer.rs
index 9364cf..80fa023587 100644
--- a/datafusion/common/src/file_options/parquet_writer.rs
+++ b/datafusion/common/src/file_options/parquet_writer.rs
@@ -342,7 +342,7 @@ pub(crate) fn parse_version_string(str_setting: ) -> 
Result {
 "2.0" => Ok(WriterVersion::PARQUET_2_0),
 _ => Err(DataFusionError::Configuration(format!(
 "Unknown or unsupported parquet writer version {str_setting} \
-valid options are '1.0' and '2.0'"
+valid options are 1.0 and 2.0"
 ))),
 }
 }
@@ -355,7 +355,7 @@ pub(crate) fn parse_statistics_string(str_setting: ) -> 
Result Ok(EnabledStatistics::Page),
 _ => Err(DataFusionError::Configuration(format!(
 "Unknown or unsupported parquet statistics setting {str_setting} \
-valid options are 'none', 'page', and 'chunk'"
+valid options are none, page, and chunk"
 ))),
 }
 }



[arrow-datafusion] branch maintain_time_zone updated (8f8140365b -> 9dbddbd648)

2023-10-21 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch maintain_time_zone
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


 discard 8f8140365b Maintain time_zone in new_list
 add 9dbddbd648 Maintain time_zone in new_list

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (8f8140365b)
\
 N -- N -- N   refs/heads/maintain_time_zone (9dbddbd648)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

No new revisions were added by this update.

Summary of changes:
 parquet-testing | 2 +-
 testing | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)



[arrow-datafusion] branch maintain_time_zone created (now 8f8140365b)

2023-10-21 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch maintain_time_zone
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


  at 8f8140365b Maintain time_zone in new_list

This branch includes the following new commits:

 new 8f8140365b Maintain time_zone in new_list

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.




[arrow-datafusion] 01/01: Maintain time_zone in new_list

2023-10-21 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch maintain_time_zone
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 8f8140365b994c6607462ccabf488500d752d524
Author: Daniël Heres 
AuthorDate: Sun Oct 22 00:39:50 2023 +0200

Maintain time_zone in new_list
---
 datafusion/common/src/scalar.rs | 38 +++---
 parquet-testing |  2 +-
 testing |  2 +-
 3 files changed, 33 insertions(+), 9 deletions(-)

diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index 2c3dd4c5ca..be24e2b933 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -620,26 +620,30 @@ macro_rules! build_timestamp_list {
 TimestampSecondBuilder,
 TimestampSecond,
 values,
-$SIZE
+$SIZE,
+$TIME_ZONE
 )
 }
 TimeUnit::Millisecond => build_values_list_tz!(
 TimestampMillisecondBuilder,
 TimestampMillisecond,
 values,
-$SIZE
+$SIZE,
+$TIME_ZONE
 ),
 TimeUnit::Microsecond => build_values_list_tz!(
 TimestampMicrosecondBuilder,
 TimestampMicrosecond,
 values,
-$SIZE
+$SIZE,
+$TIME_ZONE
 ),
 TimeUnit::Nanosecond => build_values_list_tz!(
 TimestampNanosecondBuilder,
 TimestampNanosecond,
 values,
-$SIZE
+$SIZE,
+$TIME_ZONE
 ),
 },
 }
@@ -683,9 +687,10 @@ macro_rules! build_values_list {
 }
 
 macro_rules! build_values_list_tz {
-($VALUE_BUILDER_TY:ident, $SCALAR_TY:ident, $VALUES:expr, $SIZE:expr) => {{
-let mut builder =
-ListBuilder::new($VALUE_BUILDER_TY::with_capacity($VALUES.len()));
+($VALUE_BUILDER_TY:ident, $SCALAR_TY:ident, $VALUES:expr, $SIZE:expr, 
$TIME_ZONE:expr) => {{
+let mut builder = ListBuilder::new(
+
$VALUE_BUILDER_TY::with_capacity($VALUES.len()).with_timezone_opt($TIME_ZONE),
+);
 
 for _ in 0..$SIZE {
 for scalar_value in $VALUES {
@@ -5185,6 +5190,25 @@ mod tests {
 assert_eq!(1, arr.len());
 }
 
+#[test]
+fn test_newlist_timestamp_zone() {
+let s: &'static str = "UTC";
+let values = vec![ScalarValue::TimestampMillisecond(Some(1), 
Some(s.into()))];
+let arr = ScalarValue::new_list(
+,
+::Timestamp(TimeUnit::Millisecond, Some(s.into())),
+);
+assert_eq!(1, arr.len());
+assert_eq!(
+arr.data_type(),
+::List(Arc::new(Field::new(
+"item",
+DataType::Timestamp(TimeUnit::Millisecond, Some(s.into())),
+true
+)))
+);
+}
+
 fn get_random_timestamps(sample_size: u64) -> Vec {
 let vector_size = sample_size;
 let mut timestamp = vec![];
diff --git a/parquet-testing b/parquet-testing
index e45cd23f78..a11fc8f148 16
--- a/parquet-testing
+++ b/parquet-testing
@@ -1 +1 @@
-Subproject commit e45cd23f784aab3d6bf0701f8f4e621469ed3be7
+Subproject commit a11fc8f148f8a7a89d9281cc0da3eb9d56095fbf
diff --git a/testing b/testing
index 98fceecd02..5bab2f264a 16
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit 98fceecd024dccd2f8a00e32fc144975f218acf4
+Subproject commit 5bab2f264a23f5af68f69ea93d24ef1e8e77fc88



[arrow-datafusion] branch main updated: Add Decimal256 sqllogictests for SUM, MEDIAN and COUNT aggregate expressions (#7889)

2023-10-21 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new d6d32a Add Decimal256 sqllogictests for SUM, MEDIAN and COUNT 
aggregate expressions (#7889)
d6d32a is described below

commit d6d32a6695852966c2513c66027e59dbec5e
Author: Liang-Chi Hsieh 
AuthorDate: Sat Oct 21 03:27:29 2023 -0700

Add Decimal256 sqllogictests for SUM, MEDIAN and COUNT aggregate 
expressions (#7889)

* Add Decimal256 sqllogictests test for SUM aggregate

* Add median and avg sqllogictests tests

* Remove duplicate

* Add count test
---
 datafusion/sqllogictest/test_files/decimal.slt | 19 +++
 1 file changed, 19 insertions(+)

diff --git a/datafusion/sqllogictest/test_files/decimal.slt 
b/datafusion/sqllogictest/test_files/decimal.slt
index 87a846c077..c220a5fc9a 100644
--- a/datafusion/sqllogictest/test_files/decimal.slt
+++ b/datafusion/sqllogictest/test_files/decimal.slt
@@ -701,5 +701,24 @@ select arrow_typeof(max(c1)), max(c1) from 
decimal256_simple where c4=false;
 
 Decimal256(50, 6) 0.5
 
+query TR
+select arrow_typeof(sum(c1)), sum(c1) from decimal256_simple;
+
+Decimal256(60, 6) 0.00055
+
+query TR
+select arrow_typeof(median(c1)), median(c1) from decimal256_simple;
+
+Decimal256(50, 6) 0.4
+
+query IR
+select count(*),c1 from decimal256_simple group by c1 order by c1;
+
+1 0.1
+2 0.2
+3 0.3
+4 0.4
+5 0.5
+
 statement ok
 drop table decimal256_simple;



[arrow-datafusion] branch main updated: Support Decimal256 in Min/Max aggregate expressions (#7881)

2023-10-20 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 113a7bdc5a Support Decimal256 in Min/Max aggregate expressions (#7881)
113a7bdc5a is described below

commit 113a7bdc5a093448e0d2884de9976cb27ee1d7e7
Author: Liang-Chi Hsieh 
AuthorDate: Fri Oct 20 05:13:15 2023 -0700

Support Decimal256 in Min/Max aggregate expressions (#7881)
---
 datafusion/physical-expr/src/aggregate/min_max.rs | 34 +++
 datafusion/sqllogictest/test_files/decimal.slt| 10 +++
 2 files changed, 44 insertions(+)

diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs 
b/datafusion/physical-expr/src/aggregate/min_max.rs
index 5c4c48b158..f5b708e889 100644
--- a/datafusion/physical-expr/src/aggregate/min_max.rs
+++ b/datafusion/physical-expr/src/aggregate/min_max.rs
@@ -53,6 +53,9 @@ use crate::aggregate::utils::down_cast_any_ref;
 use crate::expressions::format_state_name;
 use arrow::array::Array;
 use arrow::array::Decimal128Array;
+use arrow::array::Decimal256Array;
+use arrow::datatypes::i256;
+use arrow::datatypes::Decimal256Type;
 
 use super::moving_min_max;
 
@@ -183,6 +186,7 @@ impl AggregateExpr for Max {
 | Float32
 | Float64
 | Decimal128(_, _)
+| Decimal256(_, _)
 | Date32
 | Date64
 | Time32(_)
@@ -239,6 +243,9 @@ impl AggregateExpr for Max {
 Decimal128(_, _) => {
 instantiate_max_accumulator!(self, i128, Decimal128Type)
 }
+Decimal256(_, _) => {
+instantiate_max_accumulator!(self, i256, Decimal256Type)
+}
 
 // It would be nice to have a fast implementation for Strings as 
well
 // https://github.com/apache/arrow-datafusion/issues/6906
@@ -318,6 +325,16 @@ macro_rules! min_max_batch {
 scale
 )
 }
+DataType::Decimal256(precision, scale) => {
+typed_min_max_batch!(
+$VALUES,
+Decimal256Array,
+Decimal256,
+$OP,
+precision,
+scale
+)
+}
 // all types that have a natural order
 DataType::Float64 => {
 typed_min_max_batch!($VALUES, Float64Array, Float64, $OP)
@@ -522,6 +539,19 @@ macro_rules! min_max {
 );
 }
 }
+(
+lhs @ ScalarValue::Decimal256(lhsv, lhsp, lhss),
+rhs @ ScalarValue::Decimal256(rhsv, rhsp, rhss)
+) => {
+if lhsp.eq(rhsp) && lhss.eq(rhss) {
+typed_min_max!(lhsv, rhsv, Decimal256, $OP, lhsp, lhss)
+} else {
+return internal_err!(
+"MIN/MAX is not expected to receive scalars of 
incompatible types {:?}",
+(lhs, rhs)
+);
+}
+}
 (ScalarValue::Boolean(lhs), ScalarValue::Boolean(rhs)) => {
 typed_min_max!(lhs, rhs, Boolean, $OP)
 }
@@ -880,6 +910,7 @@ impl AggregateExpr for Min {
 | Float32
 | Float64
 | Decimal128(_, _)
+| Decimal256(_, _)
 | Date32
 | Date64
 | Time32(_)
@@ -935,6 +966,9 @@ impl AggregateExpr for Min {
 Decimal128(_, _) => {
 instantiate_min_accumulator!(self, i128, Decimal128Type)
 }
+Decimal256(_, _) => {
+instantiate_min_accumulator!(self, i256, Decimal256Type)
+}
 // This is only reached if groups_accumulator_supported is out of 
sync
 _ => internal_err!(
 "GroupsAccumulator not supported for min({})",
diff --git a/datafusion/sqllogictest/test_files/decimal.slt 
b/datafusion/sqllogictest/test_files/decimal.slt
index f968ffb90a..87a846c077 100644
--- a/datafusion/sqllogictest/test_files/decimal.slt
+++ b/datafusion/sqllogictest/test_files/decimal.slt
@@ -691,5 +691,15 @@ select arrow_typeof(avg(c1)), avg(c1) from 
decimal256_simple;
 
 Decimal256(54, 10) 0.36
 
+query TR
+select arrow_typeof(min(c1)), min(c1) from decimal256_simple where c4=false;
+
+Decimal256(50, 6) 0.2
+
+query TR
+select arrow_typeof(max(c1)), max(c1) from decimal256_simple where c4=false;
+
+Decimal256(50, 6) 0.5
+
 statement ok
 drop table decimal256_simple;



[arrow-datafusion] branch main updated: Support Decimal256 on AVG aggregate expression (#7853)

2023-10-18 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 634ed28fd7 Support Decimal256 on AVG aggregate expression (#7853)
634ed28fd7 is described below

commit 634ed28fd7a7aedcf48d86d8b578b5fe4e19081a
Author: Liang-Chi Hsieh 
AuthorDate: Wed Oct 18 22:31:33 2023 -0700

Support Decimal256 on AVG aggregate expression (#7853)

* More

* More

* More

* More

* Fix clippy
---
 datafusion/physical-expr/src/aggregate/average.rs | 94 +--
 datafusion/physical-expr/src/aggregate/utils.rs   | 59 --
 datafusion/sqllogictest/test_files/decimal.slt|  4 +-
 3 files changed, 109 insertions(+), 48 deletions(-)

diff --git a/datafusion/physical-expr/src/aggregate/average.rs 
b/datafusion/physical-expr/src/aggregate/average.rs
index 92c806f76f..91f2fb952d 100644
--- a/datafusion/physical-expr/src/aggregate/average.rs
+++ b/datafusion/physical-expr/src/aggregate/average.rs
@@ -21,6 +21,7 @@ use arrow::array::{AsArray, PrimitiveBuilder};
 use log::debug;
 
 use std::any::Any;
+use std::fmt::Debug;
 use std::sync::Arc;
 
 use crate::aggregate::groups_accumulator::accumulate::NullState;
@@ -33,15 +34,17 @@ use arrow::{
 array::{ArrayRef, UInt64Array},
 datatypes::Field,
 };
+use arrow_array::types::{Decimal256Type, DecimalType};
 use arrow_array::{
 Array, ArrowNativeTypeOp, ArrowNumericType, ArrowPrimitiveType, 
PrimitiveArray,
 };
+use arrow_buffer::{i256, ArrowNativeType};
 use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue};
 use datafusion_expr::type_coercion::aggregates::avg_return_type;
 use datafusion_expr::Accumulator;
 
 use super::groups_accumulator::EmitTo;
-use super::utils::Decimal128Averager;
+use super::utils::DecimalAverager;
 
 /// AVG aggregate expression
 #[derive(Debug, Clone)]
@@ -88,7 +91,19 @@ impl AggregateExpr for Avg {
 (
 Decimal128(sum_precision, sum_scale),
 Decimal128(target_precision, target_scale),
-) => Ok(Box::new(DecimalAvgAccumulator {
+) => Ok(Box::new(DecimalAvgAccumulator:: {
+sum: None,
+count: 0,
+sum_scale: *sum_scale,
+sum_precision: *sum_precision,
+target_precision: *target_precision,
+target_scale: *target_scale,
+})),
+
+(
+Decimal256(sum_precision, sum_scale),
+Decimal256(target_precision, target_scale),
+) => Ok(Box::new(DecimalAvgAccumulator:: {
 sum: None,
 count: 0,
 sum_scale: *sum_scale,
@@ -156,7 +171,7 @@ impl AggregateExpr for Avg {
 Decimal128(_sum_precision, sum_scale),
 Decimal128(target_precision, target_scale),
 ) => {
-let decimal_averager = Decimal128Averager::try_new(
+let decimal_averager = 
DecimalAveragertry_new(
 *sum_scale,
 *target_precision,
 *target_scale,
@@ -172,6 +187,27 @@ impl AggregateExpr for Avg {
 )))
 }
 
+(
+Decimal256(_sum_precision, sum_scale),
+Decimal256(target_precision, target_scale),
+) => {
+let decimal_averager = 
DecimalAveragertry_new(
+*sum_scale,
+*target_precision,
+*target_scale,
+)?;
+
+let avg_fn = move |sum: i256, count: u64| {
+decimal_averager.avg(sum, i256::from_usize(count as 
usize).unwrap())
+};
+
+Ok(Box::new(AvgGroupsAccumulatornew(
+_data_type,
+_data_type,
+avg_fn,
+)))
+}
+
 _ => not_impl_err!(
 "AvgGroupsAccumulator for ({} --> {})",
 self.input_data_type,
@@ -256,9 +292,8 @@ impl Accumulator for AvgAccumulator {
 }
 
 /// An accumulator to compute the average for decimals
-#[derive(Debug)]
-struct DecimalAvgAccumulator {
-sum: Option,
+struct DecimalAvgAccumulator {
+sum: Option,
 count: u64,
 sum_scale: i8,
 sum_precision: u8,
@@ -266,30 +301,46 @@ struct DecimalAvgAccumulator {
 target_scale: i8,
 }
 
-impl Accumulator for DecimalAvgAccumulator {
+impl Debug for DecimalAvgAccumulator {
+fn fmt(, f:  std::fmt::Formatter<'_>) -> std::fmt::Result {
+f.debug_struct("DecimalAvgAccumulator")
+.field("sum", )
+.field("count", )
+.field("sum_scale", _scale)
+ 

[arrow-datafusion] branch main updated: Add small column on empty projection (#7833)

2023-10-18 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 7acd8833cc Add small column on empty projection (#7833)
7acd8833cc is described below

commit 7acd8833cc5d03ba7643d4ae424553c7681ccce8
Author: Christoph Schulze 
AuthorDate: Wed Oct 18 13:06:51 2023 +0200

Add small column on empty projection (#7833)

* Find small column when projection is empty

* clippy

* fix comment

* fix avro.slt test

* use min_by

* clippy
---
 datafusion/core/tests/sql/explain_analyze.rs |   2 +-
 datafusion/optimizer/src/push_down_projection.rs | 201 ---
 datafusion/sqllogictest/test_files/avro.slt  |   4 +-
 datafusion/sqllogictest/test_files/json.slt  |   4 +-
 4 files changed, 179 insertions(+), 32 deletions(-)

diff --git a/datafusion/core/tests/sql/explain_analyze.rs 
b/datafusion/core/tests/sql/explain_analyze.rs
index c328f46be7..7238369f83 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -788,7 +788,7 @@ async fn explain_logical_plan_only() {
 "logical_plan",
 "Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]\
 \n  SubqueryAlias: t\
-\nProjection: column1\
+\nProjection: column2\
 \n  Values: (Utf8(\"a\"), Int64(1), Int64(100)), (Utf8(\"a\"), 
Int64(2), Int64(150))"
 ]];
 assert_eq!(expected, actual);
diff --git a/datafusion/optimizer/src/push_down_projection.rs 
b/datafusion/optimizer/src/push_down_projection.rs
index 6db4bb9ba4..839f6b5bb8 100644
--- a/datafusion/optimizer/src/push_down_projection.rs
+++ b/datafusion/optimizer/src/push_down_projection.rs
@@ -23,6 +23,7 @@ use crate::merge_projection::merge_projection;
 use crate::optimizer::ApplyOrder;
 use crate::push_down_filter::replace_cols_by_name;
 use crate::{OptimizerConfig, OptimizerRule};
+use arrow::datatypes::DataType;
 use arrow::error::Result as ArrowResult;
 use datafusion_common::ScalarValue::UInt8;
 use datafusion_common::{
@@ -148,8 +149,10 @@ impl OptimizerRule for PushDownProjection {
 {
 let mut used_columns: HashSet = HashSet::new();
 if projection_is_empty {
-used_columns
-
.insert(scan.projected_schema.fields()[0].qualified_column());
+let field = 
find_small_field(scan.projected_schema.fields()).ok_or(
+DataFusionError::Internal("Scan with empty 
schema".to_string()),
+)?;
+used_columns.insert(field.qualified_column());
 push_down_scan(_columns, scan, true)?
 } else {
 for expr in projection.expr.iter() {
@@ -161,10 +164,13 @@ impl OptimizerRule for PushDownProjection {
 }
 }
 LogicalPlan::Values(values) if projection_is_empty => {
-let first_col =
-Expr::Column(values.schema.fields()[0].qualified_column());
+let field = find_small_field(values.schema.fields()).ok_or(
+DataFusionError::Internal("Values with empty 
schema".to_string()),
+)?;
+let column = Expr::Column(field.qualified_column());
+
 LogicalPlan::Projection(Projection::try_new(
-vec![first_col],
+vec![column],
 Arc::new(child_plan.clone()),
 )?)
 }
@@ -423,7 +429,88 @@ pub fn collect_projection_expr(projection: ) -> 
HashMap
 .collect::>()
 }
 
-// Get the projection exprs from columns in the order of the schema
+/// Accumulate the memory size of a data type measured in bits.
+///
+/// Types with a variable size get assigned with a fixed size which is greater 
than most
+/// primitive types.
+///
+/// While traversing nested types, `nesting` is incremented on every level.
+fn nested_size(data_type: , nesting:  usize) -> usize {
+use DataType::*;
+if data_type.is_primitive() {
+return data_type.primitive_width().unwrap_or(1) * 8;
+}
+
+if data_type.is_nested() {
+*nesting += 1;
+}
+
+match data_type {
+Null => 0,
+Boolean => 1,
+Binary | Utf8 => 128,
+LargeBinary | LargeUtf8 => 256,
+FixedSizeBinary(bytes) => (*bytes * 8) as usize,
+// primitive types
+Int8
+| Int16
+| Int32
+| Int64
+| UInt8
+| UInt16
+| UInt32
+| UInt64
+| Float16
+| Float32
+| Float64
+| Timestamp(_, _)
+|

[arrow-datafusion] branch main updated: Fix precision loss when coercing date_part utf8 argument (#7846)

2023-10-17 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new d009d48ee9 Fix precision loss when coercing date_part utf8 argument 
(#7846)
d009d48ee9 is described below

commit d009d48ee927caae40dfd51c74c6893b69c0cd57
Author: Daniël Heres 
AuthorDate: Tue Oct 17 20:15:28 2023 +0200

Fix precision loss when coercing date_part utf8 argument (#7846)
---
 datafusion/core/tests/sql/expr.rs| 19 +++
 datafusion/expr/src/built_in_function.rs | 20 ++--
 2 files changed, 29 insertions(+), 10 deletions(-)

diff --git a/datafusion/core/tests/sql/expr.rs 
b/datafusion/core/tests/sql/expr.rs
index af33cfea65..1995a04015 100644
--- a/datafusion/core/tests/sql/expr.rs
+++ b/datafusion/core/tests/sql/expr.rs
@@ -717,6 +717,25 @@ async fn test_extract_date_part() -> Result<()> {
 "date_part('nanosecond', 
to_timestamp('2020-09-08T12:00:12.12345678+00:00'))",
 "1.212345678e10"
 );
+
+// Keep precision when coercing Utf8 to Timestamp
+test_expression!(
+"date_part('second', '2020-09-08T12:00:12.12345678+00:00')",
+"12.12345678"
+);
+test_expression!(
+"date_part('millisecond', '2020-09-08T12:00:12.12345678+00:00')",
+"12123.45678"
+);
+test_expression!(
+"date_part('microsecond', '2020-09-08T12:00:12.12345678+00:00')",
+"12123456.78"
+);
+test_expression!(
+"date_part('nanosecond', '2020-09-08T12:00:12.12345678+00:00')",
+"1.212345678e10"
+);
+
 Ok(())
 }
 
diff --git a/datafusion/expr/src/built_in_function.rs 
b/datafusion/expr/src/built_in_function.rs
index d7cfd9f420..f1af6e829b 100644
--- a/datafusion/expr/src/built_in_function.rs
+++ b/datafusion/expr/src/built_in_function.rs
@@ -1107,28 +1107,28 @@ impl BuiltinScalarFunction {
 }
 BuiltinScalarFunction::DatePart => Signature::one_of(
 vec![
-Exact(vec![Utf8, Date32]),
-Exact(vec![Utf8, Date64]),
-Exact(vec![Utf8, Timestamp(Second, None)]),
+Exact(vec![Utf8, Timestamp(Nanosecond, None)]),
 Exact(vec![
 Utf8,
-Timestamp(Second, Some(TIMEZONE_WILDCARD.into())),
+Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
 ]),
-Exact(vec![Utf8, Timestamp(Microsecond, None)]),
+Exact(vec![Utf8, Timestamp(Millisecond, None)]),
 Exact(vec![
 Utf8,
-Timestamp(Microsecond, Some(TIMEZONE_WILDCARD.into())),
+Timestamp(Millisecond, Some(TIMEZONE_WILDCARD.into())),
 ]),
-Exact(vec![Utf8, Timestamp(Millisecond, None)]),
+Exact(vec![Utf8, Timestamp(Microsecond, None)]),
 Exact(vec![
 Utf8,
-Timestamp(Millisecond, Some(TIMEZONE_WILDCARD.into())),
+Timestamp(Microsecond, Some(TIMEZONE_WILDCARD.into())),
 ]),
-Exact(vec![Utf8, Timestamp(Nanosecond, None)]),
+Exact(vec![Utf8, Timestamp(Second, None)]),
 Exact(vec![
 Utf8,
-Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
+Timestamp(Second, Some(TIMEZONE_WILDCARD.into())),
 ]),
+Exact(vec![Utf8, Date64]),
+Exact(vec![Utf8, Date32]),
 ],
 self.volatility(),
 ),



[arrow-datafusion] branch main updated: Refactor ScalarValue::new_primitive to return Result (#7830)

2023-10-17 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new efbd1043d4 Refactor ScalarValue::new_primitive to return Result (#7830)
efbd1043d4 is described below

commit efbd1043d427ebf1783d164c90e45aee1766f12e
Author: Eugene Marushchenko 
AuthorDate: Wed Oct 18 00:38:00 2023 +1000

Refactor ScalarValue::new_primitive to return Result (#7830)

Co-authored-by: Evgeny Maruschenko 
---
 datafusion/common/src/scalar.rs |  8 
 .../physical-expr/src/aggregate/bit_and_or_xor.rs   | 12 ++--
 datafusion/physical-expr/src/aggregate/median.rs|  6 +++---
 datafusion/physical-expr/src/aggregate/sum.rs   |  4 ++--
 datafusion/physical-expr/src/aggregate/sum_distinct.rs  | 17 +
 5 files changed, 24 insertions(+), 23 deletions(-)

diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index 2d47b3e314..2c3dd4c5ca 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -744,7 +744,7 @@ macro_rules! eq_array_primitive {
 }
 
 impl ScalarValue {
-/// Create a [`ScalarValue`] with the provided value and datatype
+/// Create a [`Result`] with the provided value and datatype
 ///
 /// # Panics
 ///
@@ -752,13 +752,13 @@ impl ScalarValue {
 pub fn new_primitive(
 a: Option,
 d: ,
-) -> Self {
+) -> Result {
 match a {
-None => d.try_into().unwrap(),
+None => d.try_into(),
 Some(v) => {
 let array = PrimitiveArraynew(vec![v].into(), None)
 .with_data_type(d.clone());
-Self::try_from_array(, 0).unwrap()
+Self::try_from_array(, 0)
 }
 }
 }
diff --git a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs 
b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
index d7934e79c3..6c97d62061 100644
--- a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
+++ b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
@@ -195,7 +195,7 @@ where
 }
 
 fn evaluate() -> Result {
-Ok(ScalarValue::new_primitive::(self.value, ::DATA_TYPE))
+ScalarValue::new_primitive::(self.value, ::DATA_TYPE)
 }
 
 fn size() -> usize {
@@ -356,7 +356,7 @@ where
 }
 
 fn evaluate() -> Result {
-Ok(ScalarValue::new_primitive::(self.value, ::DATA_TYPE))
+ScalarValue::new_primitive::(self.value, ::DATA_TYPE)
 }
 
 fn size() -> usize {
@@ -517,7 +517,7 @@ where
 }
 
 fn evaluate() -> Result {
-Ok(ScalarValue::new_primitive::(self.value, ::DATA_TYPE))
+ScalarValue::new_primitive::(self.value, ::DATA_TYPE)
 }
 
 fn size() -> usize {
@@ -638,11 +638,11 @@ where
 // 1. Stores aggregate state in `ScalarValue::List`
 // 2. Constructs `ScalarValue::List` state from distinct numeric 
stored in hash set
 let state_out = {
-let values: Vec = self
+let values = self
 .values
 .iter()
 .map(|x| ScalarValue::new_primitive::(Some(*x), 
::DATA_TYPE))
-.collect();
+.collect::>>()?;
 
 let arr = ScalarValue::new_list(, ::DATA_TYPE);
 vec![ScalarValue::List(arr)]
@@ -685,7 +685,7 @@ where
 acc = acc ^ *distinct_value;
 }
 let v = (!self.values.is_empty()).then_some(acc);
-Ok(ScalarValue::new_primitive::(v, ::DATA_TYPE))
+ScalarValue::new_primitive::(v, ::DATA_TYPE)
 }
 
 fn size() -> usize {
diff --git a/datafusion/physical-expr/src/aggregate/median.rs 
b/datafusion/physical-expr/src/aggregate/median.rs
index 477dcadcee..691b1c1752 100644
--- a/datafusion/physical-expr/src/aggregate/median.rs
+++ b/datafusion/physical-expr/src/aggregate/median.rs
@@ -146,11 +146,11 @@ impl std::fmt::Debug for 
MedianAccumulator {
 
 impl Accumulator for MedianAccumulator {
 fn state() -> Result> {
-let all_values: Vec = self
+let all_values = self
 .all_values
 .iter()
 .map(|x| ScalarValue::new_primitive::(Some(*x), 
_type))
-.collect();
+.collect::>>()?;
 
 let arr = ScalarValue::new_list(_values, _type);
 Ok(vec![ScalarValue::List(arr)])
@@ -188,7 +188,7 @@ impl Accumulator for 
MedianAccumulator {
 let (_, median, _) = d.select_nth_unstable_by(len / 2, cmp);
 Some(*median)
 };
-Ok(ScalarValue::new_primitive::(median, _type))
+ScalarValue::new_primitive::(median, _type)
 }
 
 fn size() -> usize {
diff --git a/datafusion/physical-expr/src/aggregate/sum.rs 
b/dataf

[arrow-datafusion] branch main updated: Avoid panics on error while encoding/decoding ListValue::Array as protobuf (#7837)

2023-10-17 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 2cd1706115 Avoid panics on error while encoding/decoding 
ListValue::Array as protobuf (#7837)
2cd1706115 is described below

commit 2cd170611547c0bb9e0c1788c40f3f006a7d1ec7
Author: Andrew Lamb 
AuthorDate: Tue Oct 17 07:47:53 2023 -0400

Avoid panics on error while encoding/decoding ListValue::Array as protobuf 
(#7837)
---
 datafusion/proto/src/logical_plan/from_proto.rs | 34 -
 datafusion/proto/src/logical_plan/to_proto.rs   | 39 ++---
 2 files changed, 37 insertions(+), 36 deletions(-)

diff --git a/datafusion/proto/src/logical_plan/from_proto.rs 
b/datafusion/proto/src/logical_plan/from_proto.rs
index b3873c01dd..c87882ca72 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -26,7 +26,7 @@ use crate::protobuf::{
 OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
 };
 use arrow::{
-buffer::{Buffer, MutableBuffer},
+buffer::Buffer,
 datatypes::{
 i256, DataType, Field, IntervalMonthDayNanoType, IntervalUnit, Schema, 
TimeUnit,
 UnionFields, UnionMode,
@@ -645,6 +645,7 @@ impl TryFrom<::ScalarValue> for ScalarValue {
 Value::Float32Value(v) => Self::Float32(Some(*v)),
 Value::Float64Value(v) => Self::Float64(Some(*v)),
 Value::Date32Value(v) => Self::Date32(Some(*v)),
+// ScalarValue::List is serialized using arrow IPC format
 Value::ListValue(scalar_list) => {
 let protobuf::ScalarListValue {
 ipc_message,
@@ -655,29 +656,36 @@ impl TryFrom<::ScalarValue> for ScalarValue {
 let schema: Schema = if let Some(schema_ref) = schema {
 schema_ref.try_into()?
 } else {
-return Err(Error::General("Unexpected 
schema".to_string()));
+return Err(Error::General(
+"Invalid schema while deserializing ScalarValue::List"
+.to_string(),
+));
 };
 
-let message = root_as_message(ipc_message.as_slice()).unwrap();
+let message = 
root_as_message(ipc_message.as_slice()).map_err(|e| {
+Error::General(format!(
+"Error IPC message while deserializing 
ScalarValue::List: {e}"
+))
+})?;
+let buffer = Buffer::from(arrow_data);
 
-// TODO: Add comment to why adding 0 before arrow_data.
-// This code is from 
https://github.com/apache/arrow-rs/blob/4320a753beaee0a1a6870c59ef46b59e88c9c323/arrow-ipc/src/reader.rs#L1670-L1674C45
-// Construct an unaligned buffer
-let mut buffer = MutableBuffer::with_capacity(arrow_data.len() 
+ 1);
-buffer.push(0_u8);
-buffer.extend_from_slice(arrow_data.as_slice());
-let b = Buffer::from(buffer).slice(1);
+let ipc_batch = message.header_as_record_batch().ok_or_else(|| 
{
+Error::General(
+"Unexpected message type deserializing 
ScalarValue::List"
+.to_string(),
+)
+})?;
 
-let ipc_batch = message.header_as_record_batch().unwrap();
 let record_batch = read_record_batch(
-,
+,
 ipc_batch,
 Arc::new(schema),
 ::default(),
 None,
 (),
 )
-.unwrap();
+.map_err(DataFusionError::ArrowError)
+.map_err(|e| e.context("Decoding ScalarValue::List Value"))?;
 let arr = record_batch.column(0);
 Self::List(arr.to_owned())
 }
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs 
b/datafusion/proto/src/logical_plan/to_proto.rs
index e80d60931c..125ced032e 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -56,13 +56,6 @@ use datafusion_expr::{
 pub enum Error {
 General(String),
 
-InconsistentListTyping(DataType, DataType),
-
-InconsistentListDesignated {
-value: ScalarValue,
-designated: DataType,
-},
-
 InvalidScalarValue(ScalarValue),
 
 InvalidScalarType(DataType),
@@ -80,18 +73,6 @@ impl std::fmt::Display for Error {
 fn fmt(, f:  std::fmt::Formatter) -> std::fmt::Result {
 match self {

[arrow-datafusion] branch main updated: [MINOR]:Do not introduce unnecessary repartition when row count is 1. (#7832)

2023-10-17 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 9aacdee138 [MINOR]:Do not introduce unnecessary repartition when row 
count is 1. (#7832)
9aacdee138 is described below

commit 9aacdee13881f25335d8ea323311949b9799e6d3
Author: Mustafa Akur <106137913+mustafasr...@users.noreply.github.com>
AuthorDate: Tue Oct 17 11:18:44 2023 +0300

[MINOR]:Do not introduce unnecessary repartition when row count is 1. 
(#7832)

* Initial commit

* Fix failing tests

* More idiomatic expressions

* Update tests, use batch size during partition benefit check

* Fix failing tests

* is_exact when row count is 1

-

Co-authored-by: Mehmet Ozan Kabak 
---
 datafusion/core/src/datasource/listing/table.rs|  13 +-
 .../src/physical_optimizer/enforce_distribution.rs |  34 +++--
 datafusion/core/tests/sql/order.rs |   8 +-
 datafusion/physical-plan/src/aggregates/mod.rs |  21 ++-
 datafusion/sqllogictest/test_files/aggregate.slt   |  20 +--
 datafusion/sqllogictest/test_files/copy.slt|   2 +-
 datafusion/sqllogictest/test_files/groupby.slt |  53 +---
 datafusion/sqllogictest/test_files/join.slt|  10 +-
 datafusion/sqllogictest/test_files/joins.slt   | 148 +++--
 datafusion/sqllogictest/test_files/options.slt |   6 +-
 datafusion/sqllogictest/test_files/select.slt  |   3 +-
 datafusion/sqllogictest/test_files/subquery.slt|  32 +++--
 .../sqllogictest/test_files/tpch/q15.slt.part  |  27 ++--
 datafusion/sqllogictest/test_files/union.slt   | 107 ---
 14 files changed, 270 insertions(+), 214 deletions(-)

diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 0646243669..6ef214c97f 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -1615,10 +1615,12 @@ mod tests {
 
 #[tokio::test]
 async fn test_insert_into_append_new_json_files() -> Result<()> {
+let mut config_map: HashMap = HashMap::new();
+config_map.insert("datafusion.execution.batch_size".into(), 
"1".into());
 helper_test_append_new_files_to_table(
 FileType::JSON,
 FileCompressionType::UNCOMPRESSED,
-None,
+Some(config_map),
 )
 .await?;
 Ok(())
@@ -1637,10 +1639,12 @@ mod tests {
 
 #[tokio::test]
 async fn test_insert_into_append_new_csv_files() -> Result<()> {
+let mut config_map: HashMap = HashMap::new();
+config_map.insert("datafusion.execution.batch_size".into(), 
"1".into());
 helper_test_append_new_files_to_table(
 FileType::CSV,
 FileCompressionType::UNCOMPRESSED,
-None,
+Some(config_map),
 )
 .await?;
 Ok(())
@@ -1648,10 +1652,12 @@ mod tests {
 
 #[tokio::test]
 async fn test_insert_into_append_new_parquet_files_defaults() -> 
Result<()> {
+let mut config_map: HashMap = HashMap::new();
+config_map.insert("datafusion.execution.batch_size".into(), 
"1".into());
 helper_test_append_new_files_to_table(
 FileType::PARQUET,
 FileCompressionType::UNCOMPRESSED,
-None,
+Some(config_map),
 )
 .await?;
 Ok(())
@@ -1838,6 +1844,7 @@ mod tests {
 "datafusion.execution.parquet.write_batch_size".into(),
 "5".into(),
 );
+config_map.insert("datafusion.execution.batch_size".into(), 
"1".into());
 helper_test_append_new_files_to_table(
 FileType::PARQUET,
 FileCompressionType::UNCOMPRESSED,
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 9be566f10a..52525d1fc4 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -1021,6 +1021,7 @@ fn add_hash_on_top(
 // until Repartition(Hash).
 dist_onward:  Option,
 input_idx: usize,
+repartition_beneficial_stats: bool,
 ) -> Result> {
 if n_target == input.output_partitioning().partition_count() && n_target 
== 1 {
 // In this case adding a hash repartition is unnecessary as the hash
@@ -1044,9 +1045,13 @@ fn add_hash_on_top(
 // - Usage of order preserving variants is not desirable (per the flag
 //   `config.optimizer.bounded_order_preserving_variants`).
 let should_preserve_or

[arrow-datafusion] branch main updated: Update zstd requirement from 0.12 to 0.13 (#7806)

2023-10-12 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 3ccbcfc49c Update zstd requirement from 0.12 to 0.13 (#7806)
3ccbcfc49c is described below

commit 3ccbcfc49cafffe5d25425ed56a281f74c6edae7
Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
AuthorDate: Fri Oct 13 07:05:49 2023 +0200

Update zstd requirement from 0.12 to 0.13 (#7806)

* Update zstd requirement from 0.12 to 0.13

Updates the requirements on [zstd](https://github.com/gyscos/zstd-rs) to 
permit the latest version.
- [Release notes](https://github.com/gyscos/zstd-rs/releases)
- [Commits](https://github.com/gyscos/zstd-rs/compare/v0.12.0...v0.13.0)

---
updated-dependencies:
- dependency-name: zstd
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] 

* Update

-

Signed-off-by: dependabot[bot] 
Co-authored-by: dependabot[bot] 
<49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Liang-Chi Hsieh 
---
 datafusion-cli/Cargo.lock  | 49 +++---
 datafusion/core/Cargo.toml |  2 +-
 2 files changed, 34 insertions(+), 17 deletions(-)

diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index f695e81d98..1c872c2848 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -112,7 +112,7 @@ dependencies = [
  "typed-builder",
  "uuid",
  "xz2",
- "zstd",
+ "zstd 0.12.4",
 ]
 
 [[package]]
@@ -370,8 +370,8 @@ dependencies = [
  "pin-project-lite",
  "tokio",
  "xz2",
- "zstd",
- "zstd-safe",
+ "zstd 0.12.4",
+ "zstd-safe 6.0.6",
 ]
 
 [[package]]
@@ -1141,7 +1141,7 @@ dependencies = [
  "url",
  "uuid",
  "xz2",
- "zstd",
+ "zstd 0.13.0",
 ]
 
 [[package]]
@@ -1232,7 +1232,7 @@ dependencies = [
  "hashbrown 0.14.1",
  "itertools",
  "log",
- "regex-syntax 0.8.0",
+ "regex-syntax 0.8.1",
 ]
 
 [[package]]
@@ -2319,9 +2319,9 @@ checksum = 
"ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
 
 [[package]]
 name = "ordered-float"
-version = "2.10.0"
+version = "2.10.1"
 source = "registry+https://github.com/rust-lang/crates.io-index;
-checksum = "7940cf2ca942593318d07fcf2596cdca60a85c9e7fab408a5e21a4f9dcd40d87"
+checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c"
 dependencies = [
  "num-traits",
 ]
@@ -2392,7 +2392,7 @@ dependencies = [
  "thrift",
  "tokio",
  "twox-hash",
- "zstd",
+ "zstd 0.12.4",
 ]
 
 [[package]]
@@ -2681,7 +2681,7 @@ dependencies = [
  "aho-corasick",
  "memchr",
  "regex-automata",
- "regex-syntax 0.8.0",
+ "regex-syntax 0.8.1",
 ]
 
 [[package]]
@@ -2692,7 +2692,7 @@ checksum = 
"465c6fc0621e4abc4187a2bda0937bfd4f722c2730b29562e19689ea796c9a4b"
 dependencies = [
  "aho-corasick",
  "memchr",
- "regex-syntax 0.8.0",
+ "regex-syntax 0.8.1",
 ]
 
 [[package]]
@@ -2709,9 +2709,9 @@ checksum = 
"dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da"
 
 [[package]]
 name = "regex-syntax"
-version = "0.8.0"
+version = "0.8.1"
 source = "registry+https://github.com/rust-lang/crates.io-index;
-checksum = "c3cbb081b9784b07cceb8824c8583f86db4814d172ab043f3c23f7dc600bf83d"
+checksum = "56d84fdd47036b038fc80dd333d10b6aab10d5d31f4a366e20014def75328d33"
 
 [[package]]
 name = "reqwest"
@@ -3933,7 +3933,16 @@ version = "0.12.4"
 source = "registry+https://github.com/rust-lang/crates.io-index;
 checksum = "1a27595e173641171fc74a1232b7b1c7a7cb6e18222c11e9dfb9888fa424c53c"
 dependencies = [
- "zstd-safe",
+ "zstd-safe 6.0.6",
+]
+
+[[package]]
+name = "zstd"
+version = "0.13.0"
+source = "registry+https://github.com/rust-lang/crates.io-index;
+checksum = "bffb3309596d527cfcba7dfc6ed6052f1d39dfbd7c867aa2e865e4a449c10110"
+dependencies = [
+ "zstd-safe 7.0.0",
 ]
 
 [[package]]
@@ -3946,13 +3955,21 @@ dependencies = [
  "zstd-sys",
 ]
 
+[[package]]
+name = "zstd-safe"
+version = "7.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index;
+checksum = "43747c7422e2924c11144d5229878b98180ef8b06cca4ab5af37afc8a8d8ea3e"
+dependencies = [
+ "zstd-sys",
+]
+
 [[package]]
 name = "zstd-sys&qu

[arrow-datafusion] branch main updated: `DataSink` additions (#7778)

2023-10-10 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new d2cc4d253e `DataSink` additions (#7778)
d2cc4d253e is described below

commit d2cc4d253e13f32ac95545ae79a3cb2d4d59de78
Author: Daniël Heres 
AuthorDate: Tue Oct 10 14:55:18 2023 +0200

`DataSink` additions (#7778)

* File sink additions

* Fmt

* Clippy

* Update datafusion/physical-plan/src/insert.rs

Co-authored-by: Andrew Lamb 

* Feedback

* Fmt

-

Co-authored-by: Andrew Lamb 
---
 datafusion/core/src/datasource/file_format/csv.rs   |  9 +
 datafusion/core/src/datasource/file_format/json.rs  |  9 +
 .../core/src/datasource/file_format/parquet.rs  |  9 +
 datafusion/core/src/datasource/memory.rs|  9 +
 datafusion/physical-plan/src/insert.rs  | 21 +
 5 files changed, 57 insertions(+)

diff --git a/datafusion/core/src/datasource/file_format/csv.rs 
b/datafusion/core/src/datasource/file_format/csv.rs
index 4c625b7ed7..e77382ad9c 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -33,6 +33,7 @@ use datafusion_physical_expr::{PhysicalExpr, 
PhysicalSortRequirement};
 
 use async_trait::async_trait;
 use bytes::{Buf, Bytes};
+use datafusion_physical_plan::metrics::MetricsSet;
 use futures::stream::BoxStream;
 use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
 use object_store::{delimited::newline_delimited_stream, ObjectMeta, 
ObjectStore};
@@ -484,6 +485,14 @@ impl CsvSink {
 
 #[async_trait]
 impl DataSink for CsvSink {
+fn as_any() ->  Any {
+self
+}
+
+fn metrics() -> Option {
+None
+}
+
 async fn write_all(
 ,
 data: Vec,
diff --git a/datafusion/core/src/datasource/file_format/json.rs 
b/datafusion/core/src/datasource/file_format/json.rs
index 6c260b9802..fa8fb5a723 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -25,6 +25,7 @@ use datafusion_common::DataFusionError;
 use datafusion_common::FileType;
 use datafusion_execution::TaskContext;
 use datafusion_physical_expr::PhysicalSortRequirement;
+use datafusion_physical_plan::metrics::MetricsSet;
 use rand::distributions::Alphanumeric;
 use rand::distributions::DistString;
 use std::fmt;
@@ -276,6 +277,14 @@ impl JsonSink {
 
 #[async_trait]
 impl DataSink for JsonSink {
+fn as_any() ->  Any {
+self
+}
+
+fn metrics() -> Option {
+None
+}
+
 async fn write_all(
 ,
 data: Vec,
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index 062ec1329d..d946bfb0b9 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -17,6 +17,7 @@
 
 //! Parquet format abstractions
 
+use datafusion_physical_plan::metrics::MetricsSet;
 use parquet::column::writer::ColumnCloseResult;
 use parquet::file::writer::SerializedFileWriter;
 use rand::distributions::DistString;
@@ -757,6 +758,14 @@ impl ParquetSink {
 
 #[async_trait]
 impl DataSink for ParquetSink {
+fn as_any() ->  Any {
+self
+}
+
+fn metrics() -> Option {
+None
+}
+
 async fn write_all(
 ,
 mut data: Vec,
diff --git a/datafusion/core/src/datasource/memory.rs 
b/datafusion/core/src/datasource/memory.rs
index 2766e73d33..ba99a2b695 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -17,6 +17,7 @@
 
 //! [`MemTable`] for querying `Vec` by DataFusion.
 
+use datafusion_physical_plan::metrics::MetricsSet;
 use futures::StreamExt;
 use log::debug;
 use std::any::Any;
@@ -259,6 +260,14 @@ impl MemSink {
 
 #[async_trait]
 impl DataSink for MemSink {
+fn as_any() ->  Any {
+self
+}
+
+fn metrics() -> Option {
+None
+}
+
 async fn write_all(
 ,
 mut data: Vec,
diff --git a/datafusion/physical-plan/src/insert.rs 
b/datafusion/physical-plan/src/insert.rs
index a7b0d32c8e..bff20e85b7 100644
--- a/datafusion/physical-plan/src/insert.rs
+++ b/datafusion/physical-plan/src/insert.rs
@@ -35,6 +35,7 @@ use std::any::Any;
 use std::fmt::Debug;
 use std::sync::Arc;
 
+use crate::metrics::MetricsSet;
 use crate::stream::RecordBatchStreamAdapter;
 use datafusion_common::{exec_err, internal_err, DataFusionError};
 use datafusion_execution::TaskContext;
@@ -46,6 +47,16 @@ use datafusion_execution::TaskContext;
 /// output.
 #[async_trait]
 pub trait DataSink: DisplayAs + Debug + Send + Sync {
+/// Returns the data sink as [`Any`](std::

[arrow-datafusion] branch main updated: Minor: improve documentation to `stagger_batch` (#7754)

2023-10-09 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 6685583dfc Minor: improve documentation to `stagger_batch` (#7754)
6685583dfc is described below

commit 6685583dfccf56967cf834d55846447263ec148d
Author: Andrew Lamb 
AuthorDate: Mon Oct 9 02:44:23 2023 -0400

Minor: improve documentation to `stagger_batch` (#7754)
---
 test-utils/src/lib.rs | 14 --
 1 file changed, 12 insertions(+), 2 deletions(-)

diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs
index dfd8782751..e3c96d16ee 100644
--- a/test-utils/src/lib.rs
+++ b/test-utils/src/lib.rs
@@ -70,13 +70,23 @@ pub fn add_empty_batches(
 }
 
 /// "stagger" batches: split the batches into random sized batches
+///
+/// For example, if the input batch has 1000 rows, [`stagger_batch`] might 
return
+/// multiple batches
+/// ```text
+/// [
+///   RecordBatch(123 rows),
+///   RecordBatch(234 rows),
+///   RecordBatch(634 rows),
+/// ]
+/// ```
 pub fn stagger_batch(batch: RecordBatch) -> Vec {
 let seed = 42;
 stagger_batch_with_seed(batch, seed)
 }
 
-/// "stagger" batches: split the batches into random sized batches
-/// using the specified value for a rng seed
+/// "stagger" batches: split the batches into random sized batches using the
+/// specified value for a rng seed. See [`stagger_batch`] for more detail.
 pub fn stagger_batch_with_seed(batch: RecordBatch, seed: u64) -> 
Vec {
 let mut batches = vec![];
 



[arrow-ballista] branch update_df updated: Update dependencies some more

2023-10-05 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch update_df
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/update_df by this push:
 new e78558ed Update dependencies some more
e78558ed is described below

commit e78558edbadd97b11f422cd76e80489f4c3043bd
Author: Daniël Heres 
AuthorDate: Thu Oct 5 15:58:27 2023 +0200

Update dependencies some more
---
 ballista/core/Cargo.toml  | 2 +-
 ballista/scheduler/Cargo.toml | 4 ++--
 examples/Cargo.toml   | 2 +-
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index 8a00c078..7db74072 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -67,7 +67,7 @@ once_cell = "1.9.0"
 parking_lot = "0.12"
 parse_arg = "0.1.3"
 prost = "0.12"
-prost-types = "0.11"
+prost-types = "0.12"
 rand = "0.8"
 serde = { version = "1", features = ["derive"] }
 sqlparser = { workspace = true }
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index b166c65c..d22bf7b8 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -53,7 +53,7 @@ configure_me = { workspace = true }
 dashmap = "5.4.0"
 datafusion = { workspace = true }
 datafusion-proto = { workspace = true }
-etcd-client = { version = "0.11", optional = true }
+etcd-client = { version = "0.12", optional = true }
 flatbuffers = { version = "23.5.26" }
 futures = "0.3"
 graphviz-rust = "0.6.1"
@@ -68,7 +68,7 @@ parking_lot = "0.12"
 parse_arg = "0.1.3"
 prometheus = { version = "0.13", features = ["process"], optional = true }
 prost = "0.12"
-prost-types = { version = "0.11.0" }
+prost-types = { version = "0.12.0" }
 rand = "0.8"
 serde = { version = "1", features = ["derive"] }
 sled_package = { package = "sled", version = "0.34", optional = true }
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index 01e8a3f8..15d91d41 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -46,4 +46,4 @@ tokio = { version = "1.0", features = [
 "sync",
 "parking_lot",
 ] }
-tonic = "0.9"
+tonic = "0.10"



[arrow-ballista] branch update_df updated: Update DataFusion to latest

2023-10-05 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch update_df
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/update_df by this push:
 new dc2e3cfa Update DataFusion to latest
dc2e3cfa is described below

commit dc2e3cfaec71db1344c4288b12732775ece836d2
Author: Daniël Heres 
AuthorDate: Thu Oct 5 15:53:20 2023 +0200

Update DataFusion to latest
---
 Cargo.toml | 18 
 ballista-cli/src/main.rs   |  9 
 ballista/cache/Cargo.toml  |  2 +-
 ballista/client/src/context.rs |  2 +-
 ballista/core/Cargo.toml   |  4 +-
 ballista/core/src/serde/generated/ballista.rs  | 51 ++
 ballista/core/src/serde/scheduler/to_proto.rs  | 10 -
 ballista/core/src/utils.rs |  6 +--
 ballista/executor/Cargo.toml   |  2 +-
 ballista/executor/src/executor_process.rs  |  3 ++
 ballista/scheduler/Cargo.toml  |  6 +--
 ballista/scheduler/src/flight_sql.rs   | 19 
 .../scheduler/src/state/execution_graph_dot.rs |  4 +-
 ballista/scheduler/src/state/session_manager.rs|  2 +-
 ballista/scheduler/src/test_utils.rs   | 14 +++---
 benchmarks/src/bin/nyctaxi.rs  |  2 +-
 benchmarks/src/bin/tpch.rs | 10 ++---
 examples/Cargo.toml|  2 +-
 18 files changed, 100 insertions(+), 66 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index f8e46de5..d51f26ee 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -29,18 +29,18 @@ members = [
 resolver = "2"
 
 [workspace.dependencies]
-arrow = { version = "46.0.0" }
-arrow-flight = { version = "46.0.0", features = ["flight-sql-experimental"] }
-arrow-schema = { version = "46.0.0", default-features = false }
+arrow = { version = "47.0.0" }
+arrow-flight = { version = "47.0.0", features = ["flight-sql-experimental"] }
+arrow-schema = { version = "47.0.0", default-features = false }
 configure_me = { version = "0.4.0" }
 configure_me_codegen = { version = "0.4.4" }
-datafusion = "31.0.0"
-datafusion-cli = "31.0.0"
-datafusion-proto = "31.0.0"
+datafusion = { git = "https://github.com/apache/arrow-datafusion.git;, 
rev="1cf808d0e081221cc31b5f92fc414de7c76cf5c4" }
+datafusion-cli = { git = "https://github.com/apache/arrow-datafusion.git;, 
rev="1cf808d0e081221cc31b5f92fc414de7c76cf5c4" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git;, 
rev="1cf808d0e081221cc31b5f92fc414de7c76cf5c4" }
 object_store = "0.7.0"
-sqlparser = "0.37.0"
-tonic = { version = "0.9" }
-tonic-build = { version = "0.9", default-features = false, features = [
+sqlparser = "0.38.0"
+tonic = { version = "0.10" }
+tonic-build = { version = "0.10", default-features = false, features = [
 "transport",
 "prost"
 ] }
diff --git a/ballista-cli/src/main.rs b/ballista-cli/src/main.rs
index 3f8f9ba8..66c01441 100644
--- a/ballista-cli/src/main.rs
+++ b/ballista-cli/src/main.rs
@@ -23,6 +23,7 @@ use ballista_cli::{
 exec, print_format::PrintFormat, print_options::PrintOptions, 
BALLISTA_CLI_VERSION,
 };
 use clap::Parser;
+use datafusion_cli::print_options::MaxRows;
 use mimalloc::MiMalloc;
 
 #[global_allocator]
@@ -82,6 +83,13 @@ struct Args {
 #[clap(long, help = "Ballista scheduler port")]
 port: Option,
 
+#[clap(
+long,
+help = "The max number of rows to display for 'Table' 
format\n[default: 40] [possible values: numbers(0/10/...), inf(no limit)]",
+default_value = "40"
+)]
+maxrows: MaxRows,
+
 #[clap(
 short,
 long,
@@ -133,6 +141,7 @@ pub async fn main() -> Result<()> {
 let mut print_options = PrintOptions {
 format: args.format,
 quiet: args.quiet,
+maxrows: args.maxrows,
 };
 
 let files = args.file;
diff --git a/ballista/cache/Cargo.toml b/ballista/cache/Cargo.toml
index e8ad7552..80237ce3 100644
--- a/ballista/cache/Cargo.toml
+++ b/ballista/cache/Cargo.toml
@@ -23,7 +23,7 @@ edition = "2021"
 # See more keys and their definitions at 
https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
-async-trait = "0.1.64"
+async-trait = "0.1.73"
 futures = "0.3"
 hashbrown = "0.14"
 log = "0.4"
diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs
index a0671acf..76c8d439 100644
--- a/ballista/client/src/context.r

[arrow-ballista] branch update_df created (now 1063e0a7)

2023-10-05 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch update_df
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


  at 1063e0a7 Upgrade datafusion to 31.0.0 (#878)

No new revisions were added by this update.



[arrow-datafusion] branch main updated (0408c2b159 -> 1cf808d0e0)

2023-10-05 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


from 0408c2b159 Rename  `bounded_order_preserving_variants` config to 
`prefer_exising_sort` and update docs (#7723)
 add 1cf808d0e0 Optimize "ORDER BY + LIMIT" queries for speed / memory with 
special TopK operator (#7721)

No new revisions were added by this update.

Summary of changes:
 datafusion/physical-plan/src/lib.rs|   2 +
 datafusion/physical-plan/src/sorts/sort.rs |  79 ++-
 datafusion/physical-plan/src/topk/mod.rs   | 644 +
 datafusion/sqllogictest/test_files/decimal.slt |  15 +-
 datafusion/sqllogictest/test_files/topk.slt| 232 +
 datafusion/sqllogictest/test_files/window.slt  | 111 ++---
 6 files changed, 996 insertions(+), 87 deletions(-)
 create mode 100644 datafusion/physical-plan/src/topk/mod.rs
 create mode 100644 datafusion/sqllogictest/test_files/topk.slt



[arrow-datafusion] branch main updated (2ab0c0005f -> e97ed6650e)

2023-10-02 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


from 2ab0c0005f Minor: Add comment explaining why verify benchmark results 
uses release mode (#7712)
 add e97ed6650e Support all the codecs supported by Avro (#7718)

No new revisions were added by this update.

Summary of changes:
 datafusion/common/Cargo.toml|   2 +-
 datafusion/sqllogictest/test_files/avro.slt | 124 
 testing |   2 +-
 3 files changed, 126 insertions(+), 2 deletions(-)



[arrow-datafusion] branch main updated: Update Default Parquet Write Compression (#7692)

2023-09-29 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 692ea24357 Update Default Parquet Write Compression (#7692)
692ea24357 is described below

commit 692ea24357d32b1242c476f0ed33498c815ac921
Author: Devin D'Angelo 
AuthorDate: Sat Sep 30 01:22:52 2023 -0400

Update Default Parquet Write Compression (#7692)

* update compression default

* fix tests

-

Co-authored-by: Andrew Lamb 
---
 datafusion/common/src/config.rs   | 2 +-
 datafusion/sqllogictest/test_files/information_schema.slt | 2 +-
 docs/source/user-guide/configs.md | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index b34c64ff88..261c2bf435 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -307,7 +307,7 @@ config_namespace! {
 /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
 /// These values are not case sensitive. If NULL, uses
 /// default parquet writer setting
-pub compression: Option, default = None
+pub compression: Option, default = Some("zstd(3)".into())
 
 /// Sets if dictionary encoding is enabled. If NULL, uses
 /// default parquet writer setting
diff --git a/datafusion/sqllogictest/test_files/information_schema.slt 
b/datafusion/sqllogictest/test_files/information_schema.slt
index f909010216..12aa9089a0 100644
--- a/datafusion/sqllogictest/test_files/information_schema.slt
+++ b/datafusion/sqllogictest/test_files/information_schema.slt
@@ -156,7 +156,7 @@ datafusion.execution.parquet.bloom_filter_enabled false
 datafusion.execution.parquet.bloom_filter_fpp NULL
 datafusion.execution.parquet.bloom_filter_ndv NULL
 datafusion.execution.parquet.column_index_truncate_length NULL
-datafusion.execution.parquet.compression NULL
+datafusion.execution.parquet.compression zstd(3)
 datafusion.execution.parquet.created_by datafusion
 datafusion.execution.parquet.data_page_row_count_limit 18446744073709551615
 datafusion.execution.parquet.data_pagesize_limit 1048576
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index 7fe229b4d3..638ac5a36b 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -58,7 +58,7 @@ Environment variables are read during `SessionConfig` 
initialisation so they mus
 | datafusion.execution.parquet.data_pagesize_limit   | 1048576 
  | Sets best effort maximum size of data page in bytes 




  [...]
 | datafusion.execution.parquet.write_batch_size  | 1024
  | Sets write_batch_size in bytes  




  [...]
 | datafusion.execution.parquet.writer_version| 1.0 
  | Sets parquet writer version valid values are "1.0" and "2.0"




  [...]
-| datafusion.execution.parquet.compression   | NULL
  | Sets default parquet compression codec Valid values are: 
uncompressed, snappy, gzip(level), lzo, brotli(level), lz4, zstd(level), and 
lz4_raw. These values are not case sensitive. If NULL, uses default parquet 
writer setting  

[...]
+| datafusion.execution.parquet.compression   | zstd(3) 
  | Sets default parquet compression codec Valid values are: 
uncompressed, snappy, gzip(level), lzo, brotli(level), lz4, zstd(level), and 
lz4_raw. These values are not case sensitive. If NULL, uses d

[arrow-datafusion] branch main updated: Fix bug in simplify expressions (#214) (#7699)

2023-09-29 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 2abacf4a07 Fix bug in simplify expressions (#214) (#7699)
2abacf4a07 is described below

commit 2abacf4a070a8cb48ac08da73d5e77331652903d
Author: Daniël Heres 
AuthorDate: Fri Sep 29 15:24:57 2023 +0200

Fix bug in simplify expressions (#214) (#7699)

Co-authored-by: Dan Harris <1327726+thinkharder...@users.noreply.github.com>
---
 .../src/simplify_expressions/simplify_exprs.rs | 51 --
 1 file changed, 48 insertions(+), 3 deletions(-)

diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs 
b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
index 35a698b709..355d556d5d 100644
--- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
+++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
@@ -65,10 +65,13 @@ impl SimplifyExpressions {
 ) -> Result {
 let schema = if !plan.inputs().is_empty() {
 DFSchemaRef::new(merge_schema(plan.inputs()))
-} else if let LogicalPlan::TableScan(_) = plan {
+} else if let LogicalPlan::TableScan(scan) = plan {
 // When predicates are pushed into a table scan, there needs to be
 // a schema to resolve the fields against.
-Arc::clone(plan.schema())
+Arc::new(DFSchema::try_from_qualified_schema(
+_name,
+(),
+)?)
 } else {
 Arc::new(DFSchema::empty())
 };
@@ -111,7 +114,7 @@ mod tests {
 use crate::simplify_expressions::utils::for_test::{
 cast_to_int64_expr, now_expr, to_timestamp_expr,
 };
-use crate::test::test_table_scan_with_name;
+use crate::test::{assert_fields_eq, test_table_scan_with_name};
 
 use super::*;
 use arrow::datatypes::{DataType, Field, Schema};
@@ -174,6 +177,48 @@ mod tests {
 Ok(())
 }
 
+#[test]
+fn test_simplify_table_full_filter_in_scan() -> Result<()> {
+let fields = vec![
+Field::new("a", DataType::UInt32, false),
+Field::new("b", DataType::UInt32, false),
+Field::new("c", DataType::UInt32, false),
+];
+
+let schema = Schema::new(fields);
+
+let table_scan = table_scan_with_filters(
+Some("test"),
+,
+Some(vec![0]),
+vec![col("b").is_not_null()],
+)?
+.build()?;
+assert_eq!(1, table_scan.schema().fields().len());
+assert_fields_eq(_scan, vec!["a"]);
+
+let expected = "TableScan: test projection=[a], 
full_filters=[Boolean(true) AS b IS NOT NULL]";
+
+assert_optimized_plan_eq(_scan, expected)
+}
+
+#[test]
+fn test_simplify_filter_pushdown() -> Result<()> {
+let table_scan = test_table_scan();
+let plan = LogicalPlanBuilder::from(table_scan)
+.project(vec![col("a")])?
+.filter(and(col("b").gt(lit(1)), col("b").gt(lit(1?
+.build()?;
+
+assert_optimized_plan_eq(
+,
+"\
+   Filter: test.b > Int32(1)\
+\n  Projection: test.a\
+\nTableScan: test",
+)
+}
+
 #[test]
 fn test_simplify_optimized_plan() -> Result<()> {
 let table_scan = test_table_scan();



[arrow-datafusion] branch main updated (236893f452 -> c6d29e7854)

2023-09-29 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


from 236893f452 Minor: Improve  `TableProviderFilterPushDown` docs (#7685)
 add c6d29e7854 FIX:  Test timestamp with table (#7701)

No new revisions were added by this update.

Summary of changes:
 datafusion/core/tests/sql/timestamp.rs | 31 ++-
 1 file changed, 18 insertions(+), 13 deletions(-)



[arrow-datafusion] branch main updated: Minor: Improve `TableProviderFilterPushDown` docs (#7685)

2023-09-29 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new 236893f452 Minor: Improve  `TableProviderFilterPushDown` docs (#7685)
236893f452 is described below

commit 236893f452f39d7236b653e03d8f26c9810221ac
Author: Andrew Lamb 
AuthorDate: Fri Sep 29 03:19:27 2023 -0400

Minor: Improve  `TableProviderFilterPushDown` docs (#7685)
---
 datafusion/expr/src/table_source.rs | 14 +++---
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/datafusion/expr/src/table_source.rs 
b/datafusion/expr/src/table_source.rs
index b83ce77813..94f26d9158 100644
--- a/datafusion/expr/src/table_source.rs
+++ b/datafusion/expr/src/table_source.rs
@@ -30,14 +30,14 @@ use std::any::Any;
 pub enum TableProviderFilterPushDown {
 /// The expression cannot be used by the provider.
 Unsupported,
-/// The expression can be used to help minimise the data retrieved,
-/// but the provider cannot guarantee that all returned tuples
-/// satisfy the filter. The Filter plan node containing this expression
-/// will be preserved.
+/// The expression can be used to reduce the data retrieved,
+/// but the provider cannot guarantee it will omit all tuples that
+/// may be filtered. In this case, DataFusion will apply an additional
+/// `Filter` operation after the scan to ensure all rows are filtered 
correctly.
 Inexact,
-/// The provider guarantees that all returned data satisfies this
-/// filter expression. The Filter plan node containing this expression
-/// will be removed.
+/// The provider **guarantees** that it will omit **all** tuples that are
+/// filtered by the filter expression. This is the fastest option, if 
available
+/// as DataFusion will not apply additional filtering.
 Exact,
 }
 



[arrow-datafusion] branch main updated (26a36024c2 -> 4b2b7dcfc6)

2023-09-28 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


from 26a36024c2 Move window analysis to the window method (#7672)
 add 4b2b7dcfc6 Don't add filters to used columns (#7670)

No new revisions were added by this update.

Summary of changes:
 datafusion/core/src/dataframe.rs   |   9 +-
 datafusion/core/src/datasource/view.rs |  27 ++--
 datafusion/expr/src/logical_plan/builder.rs|  11 +-
 datafusion/optimizer/src/push_down_filter.rs   | 175 +++--
 datafusion/optimizer/src/push_down_projection.rs   |  34 +++-
 .../src/simplify_expressions/simplify_exprs.rs |   3 +-
 .../optimizer/src/unwrap_cast_in_comparison.rs |   8 +-
 7 files changed, 127 insertions(+), 140 deletions(-)



[arrow-site] branch main updated: MINOR: [WEBSITE] Update affiliation for Daniël Heres (#411)

2023-09-26 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-site.git


The following commit(s) were added to refs/heads/main by this push:
 new 5e2c4a8e696 MINOR: [WEBSITE] Update affiliation for Daniël Heres (#411)
5e2c4a8e696 is described below

commit 5e2c4a8e6964aae31344812152a7b045d32570f7
Author: Andrew Lamb 
AuthorDate: Tue Sep 26 17:50:00 2023 -0400

MINOR: [WEBSITE] Update affiliation for Daniël Heres (#411)

cc @Dandandan  -- is this correct?
---
 _data/committers.yml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/_data/committers.yml b/_data/committers.yml
index 962fcd6c909..9415ea7176b 100644
--- a/_data/committers.yml
+++ b/_data/committers.yml
@@ -54,7 +54,7 @@
 - name: Daniël Heres
   role: PMC
   alias: dheres
-  affiliation: GoDataDriven
+  affiliation: Coralogix
 - name: David Li
   role: PMC
   alias: lidavidm



[arrow-datafusion] branch main updated: feat: Parallel collecting parquet files statistics #7573 (#7595)

2023-09-21 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new c7347ce578 feat: Parallel collecting parquet files statistics #7573 
(#7595)
c7347ce578 is described below

commit c7347ce578d4d8ce282e6e91d41642f857580354
Author: Hengfei Yang 
AuthorDate: Thu Sep 21 01:09:55 2023 -0500

feat: Parallel collecting parquet files statistics #7573 (#7595)

* feat: parallel collecting parquet files statistics #7573

* fix: cargo clippy format

* docs: add doc for execution.meta_fetch_concurrency

* feat: change the default value to 32 for execution.meta_fetch_concurrency
---
 datafusion/common/src/config.rs|  3 +
 .../core/src/datasource/file_format/parquet.rs |  5 +-
 datafusion/core/src/datasource/listing/table.rs| 64 --
 .../sqllogictest/test_files/information_schema.slt |  1 +
 docs/source/user-guide/configs.md  |  1 +
 5 files changed, 41 insertions(+), 33 deletions(-)

diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index c3f0861f29..b34c64ff88 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -251,6 +251,9 @@ config_namespace! {
 /// and sorted in a single RecordBatch rather than sorted in
 /// batches and merged.
 pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
+
+/// Number of files to read in parallel when inferring schema and 
statistics
+pub meta_fetch_concurrency: usize, default = 32
 }
 }
 
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index c645dc20da..d7234a5375 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -70,9 +70,6 @@ use crate::physical_plan::{
 Statistics,
 };
 
-/// The number of files to read in parallel when inferring schema
-const SCHEMA_INFERENCE_CONCURRENCY: usize = 32;
-
 /// The Apache Parquet `FileFormat` implementation
 ///
 /// Note it is recommended these are instead configured on the 
[`ConfigOptions`]
@@ -176,7 +173,7 @@ impl FileFormat for ParquetFormat {
 let schemas: Vec<_> = futures::stream::iter(objects)
 .map(|object| fetch_schema(store.as_ref(), object, 
self.metadata_size_hint))
 .boxed() // Workaround 
https://github.com/rust-lang/rust/issues/64552
-.buffered(SCHEMA_INFERENCE_CONCURRENCY)
+.buffered(state.config_options().execution.meta_fetch_concurrency)
 .try_collect()
 .await?;
 
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 1344c417a9..51844e6928 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -937,38 +937,44 @@ impl ListingTable {
 let file_list = stream::iter(file_list).flatten();
 
 // collect the statistics if required by the config
-let files = file_list.then(|part_file| async {
-let part_file = part_file?;
-let mut statistics_result = Statistics::default();
-if self.options.collect_stat {
-let statistics_cache = self.collected_statistics.clone();
-match statistics_cache.get_with_extra(
-_file.object_meta.location,
-_file.object_meta,
-) {
-Some(statistics) => statistics_result = 
statistics.as_ref().clone(),
-None => {
-let statistics = self
-.options
-.format
-.infer_stats(
-ctx,
-,
-self.file_schema.clone(),
+let files = file_list
+.map(|part_file| async {
+let part_file = part_file?;
+let mut statistics_result = Statistics::default();
+if self.options.collect_stat {
+let statistics_cache = self.collected_statistics.clone();
+match statistics_cache.get_with_extra(
+_file.object_meta.location,
+_file.object_meta,
+) {
+Some(statistics) => {
+statistics_result = statistics.as_ref().clone()
+}
+None => {
+let statistics = self
+.options
+.format
+  

[arrow-ballista] branch main updated (479208b0 -> 1063e0a7)

2023-09-19 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


from 479208b0 refactor: port get_scan_files to Ballista (#877)
 add 1063e0a7 Upgrade datafusion to 31.0.0 (#878)

No new revisions were added by this update.

Summary of changes:
 Cargo.toml   | 16 
 ballista/client/src/context.rs   |  2 ++
 ballista/core/src/cache_layer/mod.rs | 12 ++--
 ballista/scheduler/src/flight_sql.rs |  4 ++--
 benchmarks/src/bin/tpch.rs   |  2 ++
 5 files changed, 20 insertions(+), 16 deletions(-)



[arrow-datafusion] branch main updated: [Minor]: Produce better plan when group by contains all of the ordering requirements (#7542)

2023-09-13 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
 new cde74016e9 [Minor]: Produce better plan when group by contains all of 
the ordering requirements (#7542)
cde74016e9 is described below

commit cde74016e930ffd9c55eed403b84bcd026f38d0f
Author: Mustafa Akur <106137913+mustafasr...@users.noreply.github.com>
AuthorDate: Wed Sep 13 23:50:21 2023 +0300

[Minor]: Produce better plan when group by contains all of the ordering 
requirements (#7542)

* Convert partial + final to single aggregate in two tests

* Address reviews

* Use any instead of loop
---
 .../core/src/physical_plan/aggregates/mod.rs   | 37 +++---
 datafusion/physical-expr/src/lib.rs|  2 +-
 datafusion/physical-expr/src/physical_expr.rs  | 11 +++
 datafusion/sqllogictest/test_files/groupby.slt | 26 +++
 4 files changed, 56 insertions(+), 20 deletions(-)

diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs 
b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 14350ce1bb..c31c6badd3 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -36,7 +36,7 @@ use datafusion_expr::Accumulator;
 use datafusion_physical_expr::{
 equivalence::project_equivalence_properties,
 expressions::Column,
-normalize_out_expr_with_columns_map, reverse_order_bys,
+normalize_out_expr_with_columns_map, physical_exprs_contains, 
reverse_order_bys,
 utils::{convert_to_expr, get_indices_of_matching_exprs},
 AggregateExpr, LexOrdering, LexOrderingReq, OrderingEquivalenceProperties,
 PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement,
@@ -575,6 +575,27 @@ fn calc_required_input_ordering(
 
Ok((!required_input_ordering.is_empty()).then_some(required_input_ordering))
 }
 
+/// Check whether group by expression contains all of the expression inside 
`requirement`
+// As an example Group By (c,b,a) contains all of the expressions in the 
`requirement`: (a ASC, b DESC)
+fn group_by_contains_all_requirements(
+group_by: ,
+requirement: ,
+) -> bool {
+let physical_exprs = group_by
+.expr()
+.iter()
+.map(|(expr, _alias)| expr.clone())
+.collect::>();
+// When we have multiple groups (grouping set)
+// since group by may be calculated on the subset of the group_by.expr()
+// it is not guaranteed to have all of the requirements among group by 
expressions.
+// Hence do the analysis: whether group by contains all requirements in 
the single group case.
+group_by.groups.len() <= 1
+&& requirement
+.iter()
+.all(|req| physical_exprs_contains(_exprs, ))
+}
+
 impl AggregateExec {
 /// Create a new hash aggregate execution plan
 pub fn try_new(
@@ -601,16 +622,22 @@ impl AggregateExec {
 .iter()
 .zip(order_by_expr)
 .map(|(aggr_expr, fn_reqs)| {
-// If the aggregation function is order-sensitive and we are
-// performing a "first stage" calculation, keep the ordering
-// requirement as is; otherwise ignore the ordering 
requirement.
+// If
+// - aggregation function is order-sensitive and
+// - aggregation is performing a "first stage" calculation, and
+// - at least one of the aggregate function requirement is not 
inside group by expression
+// keep the ordering requirement as is; otherwise ignore the 
ordering requirement.
 // In non-first stage modes, we accumulate data (using 
`merge_batch`)
 // from different partitions (i.e. merge partial results). 
During
 // this merge, we consider the ordering of each partial result.
 // Hence, we do not need to use the ordering requirement in 
such
 // modes as long as partial results are generated with the
 // correct ordering.
-fn_reqs.filter(|_| is_order_sensitive(aggr_expr) && 
mode.is_first_stage())
+fn_reqs.filter(|req| {
+is_order_sensitive(aggr_expr)
+&& mode.is_first_stage()
+&& !group_by_contains_all_requirements(_by, req)
+})
 })
 .collect::>();
 let mut aggregator_reverse_reqs = None;
diff --git a/datafusion/physical-expr/src/lib.rs 
b/datafusion/physical-expr/src/lib.rs
index a8e49bfbd6..85081c24c3 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -61,7 +61,7 @@ pub use equivalence::{
 };
 
 pub 

[arrow-datafusion] branch main updated (561e0d7e87 -> cf229b82cb)

2023-09-13 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


from 561e0d7e87 Documentation Updates for New Write Related Features (#7520)
 add cf229b82cb Move tests from repartition to enforce_distribution (#7539)

No new revisions were added by this update.

Summary of changes:
 .../src/physical_optimizer/enforce_distribution.rs | 1292 +++-
 datafusion/core/src/physical_optimizer/mod.rs  |1 -
 .../core/src/physical_optimizer/repartition.rs | 1256 ---
 3 files changed, 1283 insertions(+), 1266 deletions(-)
 delete mode 100644 datafusion/core/src/physical_optimizer/repartition.rs



  1   2   3   4   5   6   7   >