(arrow-datafusion-comet) branch main updated: feat: Document the class path / classloader issue with the shuffle manager (#256)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 9ab6c75 feat: Document the class path / classloader issue with the shuffle manager (#256) 9ab6c75 is described below commit 9ab6c75f41456234f2fb93fcec15ff3cd435f49e Author: Holden Karau AuthorDate: Sat Apr 13 09:16:34 2024 -0700 feat: Document the class path / classloader issue with the shuffle manager (#256) --- README.md| 8 .../apache/spark/shuffle/sort/CometShuffleExternalSorter.java| 9 - 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 3b903b1..121972c 100644 --- a/README.md +++ b/README.md @@ -127,6 +127,14 @@ Comet shuffle feature is disabled by default. To enable it, please add related c Above configs enable Comet native shuffle which only supports hash partiting and single partition. Comet native shuffle doesn't support complext types yet. +Comet doesn't have official release yet so currently the only way to test it is to build jar and include it in your Spark application. Depending on your deployment mode you may also need to set the driver & executor class path(s) to explicitly contain Comet otherwise Spark may use a different class-loader for the Comet components than its internal components which will then fail at runtime. For example: + +``` +--driver-class-path spark/target/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar +``` + +Some cluster managers may require additional configuration, see https://spark.apache.org/docs/latest/cluster-overview.html + To enable columnar shuffle which supports all partitioning and basic complex types, one more config is required: ``` --conf spark.comet.columnar.shuffle.enabled=true diff --git a/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java b/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java index 9fe88ec..4417c4f 100644 --- a/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java +++ b/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java @@ -431,7 +431,14 @@ public final class CometShuffleExternalSorter implements CometShuffleChecksumSup // As we cannot access the address of the internal array in the sorter, so we need to // allocate the array manually and expand the pointer array in the sorter. // We don't want in-memory sorter to allocate memory but the initial size cannot be zero. - this.inMemSorter = new ShuffleInMemorySorter(allocator, 1, true); + try { +this.inMemSorter = new ShuffleInMemorySorter(allocator, 1, true); + } catch (java.lang.IllegalAccessError e) { +throw new java.lang.RuntimeException( +"Error loading in-memory sorter check class path -- see " ++ "https://github.com/apache/arrow-datafusion-comet?tab=readme-ov-file#enable-comet-shuffle;, +e); + } sorterArray = allocator.allocateArray(initialSize); this.inMemSorter.expandPointerArray(sorterArray);
(arrow-datafusion-comet) branch main updated: feat: Add CometRowToColumnar operator (#206)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 60fe431 feat: Add CometRowToColumnar operator (#206) 60fe431 is described below commit 60fe4315e793d7efb6a8de9246257c1b3c623766 Author: advancedxy AuthorDate: Wed Apr 10 11:59:58 2024 +0800 feat: Add CometRowToColumnar operator (#206) --- .../main/scala/org/apache/comet/CometConf.scala| 20 + .../scala/org/apache/comet/vector/NativeUtil.scala | 17 + .../org/apache/comet/vector/StreamReader.scala | 13 +- .../sql/comet/execution/arrow/ArrowWriters.scala | 472 + .../execution/arrow/CometArrowConverters.scala | 131 ++ .../org/apache/spark/sql/comet/util/Utils.scala| 15 + .../apache/comet/CometSparkSessionExtensions.scala | 56 ++- .../org/apache/comet/serde/QueryPlanSerde.scala| 3 +- .../spark/sql/comet/CometRowToColumnarExec.scala | 84 .../org/apache/spark/sql/comet/operators.scala | 5 +- .../org/apache/comet/exec/CometExecSuite.scala | 54 ++- .../scala/org/apache/spark/sql/CometTestBase.scala | 15 +- 12 files changed, 856 insertions(+), 29 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index bd2e04d..341ec98 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -337,6 +337,26 @@ object CometConf { "enabled when reading from Iceberg tables.") .booleanConf .createWithDefault(false) + + val COMET_ROW_TO_COLUMNAR_ENABLED: ConfigEntry[Boolean] = +conf("spark.comet.rowToColumnar.enabled") + .internal() + .doc(""" + |Whether to enable row to columnar conversion in Comet. When this is turned on, Comet will + |convert row-based operators in `spark.comet.rowToColumnar.supportedOperatorList` into + |columnar based before processing.""".stripMargin) + .booleanConf + .createWithDefault(false) + + val COMET_ROW_TO_COLUMNAR_SUPPORTED_OPERATOR_LIST: ConfigEntry[Seq[String]] = +conf("spark.comet.rowToColumnar.supportedOperatorList") + .doc( +"A comma-separated list of row-based operators that will be converted to columnar " + + "format when 'spark.comet.rowToColumnar.enabled' is true") + .stringConf + .toSequence + .createWithDefault(Seq("Range,InMemoryTableScan")) + } object ConfigHelpers { diff --git a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala index 3756da9..763ccff 100644 --- a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala +++ b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala @@ -23,6 +23,8 @@ import scala.collection.mutable import org.apache.arrow.c.{ArrowArray, ArrowImporter, ArrowSchema, CDataDictionaryProvider, Data} import org.apache.arrow.memory.RootAllocator +import org.apache.arrow.vector.VectorSchemaRoot +import org.apache.arrow.vector.dictionary.DictionaryProvider import org.apache.spark.SparkException import org.apache.spark.sql.comet.util.Utils import org.apache.spark.sql.vectorized.ColumnarBatch @@ -132,3 +134,18 @@ class NativeUtil { new ColumnarBatch(arrayVectors.toArray, maxNumRows) } } + +object NativeUtil { + def rootAsBatch(arrowRoot: VectorSchemaRoot): ColumnarBatch = { +rootAsBatch(arrowRoot, null) + } + + def rootAsBatch(arrowRoot: VectorSchemaRoot, provider: DictionaryProvider): ColumnarBatch = { +val vectors = (0 until arrowRoot.getFieldVectors.size()).map { i => + val vector = arrowRoot.getFieldVectors.get(i) + // Native shuffle always uses decimal128. + CometVector.getVector(vector, true, provider) +} +new ColumnarBatch(vectors.toArray, arrowRoot.getRowCount) + } +} diff --git a/common/src/main/scala/org/apache/comet/vector/StreamReader.scala b/common/src/main/scala/org/apache/comet/vector/StreamReader.scala index da72383..61d800b 100644 --- a/common/src/main/scala/org/apache/comet/vector/StreamReader.scala +++ b/common/src/main/scala/org/apache/comet/vector/StreamReader.scala @@ -21,13 +21,11 @@ package org.apache.comet.vector import java.nio.channels.ReadableByteChannel -import scala.collection.JavaConverters.collectionAsScalaIterableConverter - import org.apache.arrow.memory.RootAllocator import org.apache.arrow.vector.VectorSchemaRoot import org.apache.arrow.vector.ipc.{ArrowStreamReader, ReadChannel} import org.apache.arrow.vector.ipc.message.MessageChannelReader -import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} +import org.apache.spark.sql.vectorized.ColumnarBatch
(arrow-datafusion-comet) branch main updated: feat: Remove use of nightly int_roundings (#228)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new b0234a6 feat: Remove use of nightly int_roundings (#228) b0234a6 is described below commit b0234a65792dc2acf9fe40f52cd9ef94d1aa014d Author: Vrishabh AuthorDate: Mon Mar 25 05:15:32 2024 +0530 feat: Remove use of nightly int_roundings (#228) --- core/src/execution/datafusion/expressions/avg_decimal.rs | 4 ++-- core/src/execution/datafusion/expressions/scalar_funcs.rs | 9 ++--- core/src/execution/datafusion/expressions/utils.rs| 5 +++-- core/src/lib.rs | 1 - 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/core/src/execution/datafusion/expressions/avg_decimal.rs b/core/src/execution/datafusion/expressions/avg_decimal.rs index 6fb5581..d99ed04 100644 --- a/core/src/execution/datafusion/expressions/avg_decimal.rs +++ b/core/src/execution/datafusion/expressions/avg_decimal.rs @@ -34,7 +34,7 @@ use arrow_data::decimal::{ validate_decimal_precision, MAX_DECIMAL_FOR_EACH_PRECISION, MIN_DECIMAL_FOR_EACH_PRECISION, }; -use num::Integer; +use num::{integer::div_ceil, Integer}; use DataType::*; /// AVG aggregate expression @@ -514,7 +514,7 @@ fn avg(sum: i128, count: i128, target_min: i128, target_max: i128, scaler: i128) if let Some(value) = sum.checked_mul(scaler) { // `sum / count` with ROUND_HALF_UP let (div, rem) = value.div_rem(); -let half = count.div_ceil(2); +let half = div_ceil(count, 2); let half_neg = half.neg_wrapping(); let new_value = match value >= 0 { true if rem >= half => div.add_wrapping(1), diff --git a/core/src/execution/datafusion/expressions/scalar_funcs.rs b/core/src/execution/datafusion/expressions/scalar_funcs.rs index 78775cd..4fca723 100644 --- a/core/src/execution/datafusion/expressions/scalar_funcs.rs +++ b/core/src/execution/datafusion/expressions/scalar_funcs.rs @@ -37,7 +37,10 @@ use datafusion_common::{ use datafusion_physical_expr::{ execution_props::ExecutionProps, functions::create_physical_fun, math_expressions, }; -use num::{BigInt, Signed, ToPrimitive}; +use num::{ +integer::{div_ceil, div_floor}, +BigInt, Signed, ToPrimitive, +}; use unicode_segmentation::UnicodeSegmentation; /// Create a physical scalar function. @@ -249,13 +252,13 @@ fn long_to_decimal(v: , precision: u8) -> Option { #[inline] fn decimal_ceil_f(scale: ) -> impl Fn(i128) -> i128 { let div = 10_i128.pow_wrapping(*scale as u32); -move |x: i128| x.div_ceil(div) +move |x: i128| div_ceil(x, div) } #[inline] fn decimal_floor_f(scale: ) -> impl Fn(i128) -> i128 { let div = 10_i128.pow_wrapping(*scale as u32); -move |x: i128| x.div_floor(div) +move |x: i128| div_floor(x, div) } // Spark uses BigDecimal. See RoundBase implementation in Spark. Instead, we do the same by diff --git a/core/src/execution/datafusion/expressions/utils.rs b/core/src/execution/datafusion/expressions/utils.rs index ec0cf22..30a4692 100644 --- a/core/src/execution/datafusion/expressions/utils.rs +++ b/core/src/execution/datafusion/expressions/utils.rs @@ -31,6 +31,7 @@ use arrow_schema::DataType; use chrono::{DateTime, Offset, TimeZone}; use datafusion_common::cast::as_generic_string_array; use datafusion_physical_expr::PhysicalExpr; +use num::integer::div_floor; use std::{any::Any, sync::Arc}; /// An utility function from DataFusion. It is not exposed by DataFusion. @@ -198,13 +199,13 @@ pub(crate) fn spark_cast(array: ArrayRef, from_type: , to_type: match (from_type, to_type) { (DataType::Timestamp(_, _), DataType::Int64) => { // See Spark's `Cast` expression -unary_dyn::<_, Int64Type>(, |v| v.div_floor(MICROS_PER_SECOND)).unwrap() +unary_dyn::<_, Int64Type>(, |v| div_floor(v, MICROS_PER_SECOND)).unwrap() } (DataType::Dictionary(_, value_type), DataType::Int64) if matches!(value_type.as_ref(), ::Timestamp(_, _)) => { // See Spark's `Cast` expression -unary_dyn::<_, Int64Type>(, |v| v.div_floor(MICROS_PER_SECOND)).unwrap() +unary_dyn::<_, Int64Type>(, |v| div_floor(v, MICROS_PER_SECOND)).unwrap() } (DataType::Timestamp(_, _), DataType::Utf8) => remove_trailing_zeroes(array), (DataType::Dictionary(_, value_type), DataType::Utf8) diff --git a/core/src/lib.rs b/core/src/lib.rs index 2e85136..f209859 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -21,7 +21,6 @@ #![allow(clippy::upper_case_acronyms)] #![allow(clippy::derive_partial_eq_without_eq)] // For prost generated struct #![cfg_attr(feature = "nightly",
(arrow-datafusion-comet) branch main updated: test: Follow up on Spark 3.4 diff (#209)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 5f58010 test: Follow up on Spark 3.4 diff (#209) 5f58010 is described below commit 5f58010c90502d372a110179e03f11977e0166c9 Author: Chao Sun AuthorDate: Mon Mar 18 10:41:43 2024 -0700 test: Follow up on Spark 3.4 diff (#209) --- dev/diffs/3.4.2.diff | 207 +-- 1 file changed, 102 insertions(+), 105 deletions(-) diff --git a/dev/diffs/3.4.2.diff b/dev/diffs/3.4.2.diff index b571cd2..590e1f4 100644 --- a/dev/diffs/3.4.2.diff +++ b/dev/diffs/3.4.2.diff @@ -234,51 +234,6 @@ index 56e9520fdab..917932336df 100644 spark.range(50).write.saveAsTable(s"$dbName.$table1Name") spark.range(100).write.saveAsTable(s"$dbName.$table2Name") -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DisableComet.scala b/sql/core/src/test/scala/org/apache/spark/sql/DisableComet.scala -new file mode 100644 -index 000..07687f6685a /dev/null -+++ b/sql/core/src/test/scala/org/apache/spark/sql/DisableComet.scala -@@ -0,0 +1,39 @@ -+/* -+ * Licensed to the Apache Software Foundation (ASF) under one or more -+ * contributor license agreements. See the NOTICE file distributed with -+ * this work for additional information regarding copyright ownership. -+ * The ASF licenses this file to You under the Apache License, Version 2.0 -+ * (the "License"); you may not use this file except in compliance with -+ * the License. You may obtain a copy of the License at -+ * -+ *http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -+ * See the License for the specific language governing permissions and -+ * limitations under the License. -+ */ -+ -+package org.apache.spark.sql -+ -+import org.scalactic.source.Position -+import org.scalatest.Tag -+ -+import org.apache.spark.sql.test.SQLTestUtils -+ -+case class DisableComet(reason: String) extends Tag("DisableComet") -+ -+/** -+ * Helper trait that disables Comet for all tests regardless of default config values. -+ */ -+trait DisableCometSuite extends SQLTestUtils { -+ override protected def test(testName: String, testTags: Tag*)(testFun: => Any) -+(implicit pos: Position): Unit = { -+if (isCometEnabled) { -+ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun) -+} else { -+ super.test(testName, testTags: _*)(testFun) -+} -+ } -+} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index f33432ddb6f..fe9f74ff8f1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -311,7 +266,7 @@ index f33432ddb6f..fe9f74ff8f1 100644 } assert(scanOption.isDefined) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala -index a6b295578d6..d5e25564bb9 100644 +index a6b295578d6..a5cb616945a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -463,7 +463,8 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite @@ -320,7 +275,7 @@ index a6b295578d6..d5e25564bb9 100644 - test("Explain formatted output for scan operator for datasource V2") { + test("Explain formatted output for scan operator for datasource V2", -+ DisableComet("Comet explain output is different")) { ++ IgnoreComet("Comet explain output is different")) { withTempDir { dir => Seq("parquet", "orc", "csv", "json").foreach { fmt => val basePath = dir.getCanonicalPath + "/" + fmt @@ -361,6 +316,54 @@ index 2796b1cf154..94591f83c84 100644 }.flatten assert(filters.contains(GreaterThan(scan.logicalPlan.output.head, Literal(5L } +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala +new file mode 100644 +index 000..4b31bea33de +--- /dev/null b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala +@@ -0,0 +1,42 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work f
(arrow-datafusion-comet) branch main updated: feat: Support BloomFilterMightContain expr (#179)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 969f683 feat: Support BloomFilterMightContain expr (#179) 969f683 is described below commit 969f683c188fbcf2956da784b10777445f9adbc0 Author: advancedxy AuthorDate: Fri Mar 15 05:24:16 2024 +0800 feat: Support BloomFilterMightContain expr (#179) --- core/src/common/bit.rs | 11 ++ .../expressions/bloom_filter_might_contain.rs | 152 + core/src/execution/datafusion/expressions/mod.rs | 1 + core/src/execution/datafusion/mod.rs | 1 + core/src/execution/datafusion/planner.rs | 10 ++ core/src/execution/datafusion/spark_hash.rs| 2 +- core/src/execution/datafusion/{ => util}/mod.rs| 9 +- .../execution/datafusion/util/spark_bit_array.rs | 131 ++ .../datafusion/util/spark_bloom_filter.rs | 98 + core/src/execution/proto/expr.proto| 6 + pom.xml| 10 ++ spark/pom.xml | 18 +++ .../org/apache/comet/serde/QueryPlanSerde.scala| 17 +++ .../apache/comet/shims/ShimQueryPlanSerde.scala| 7 +- .../apache/comet/CometExpression3_3PlusSuite.scala | 106 ++ 15 files changed, 570 insertions(+), 9 deletions(-) diff --git a/core/src/common/bit.rs b/core/src/common/bit.rs index 4af560f..f736347 100644 --- a/core/src/common/bit.rs +++ b/core/src/common/bit.rs @@ -131,6 +131,17 @@ pub fn read_num_bytes_u32(size: usize, src: &[u8]) -> u32 { trailing_bits(v as u64, size * 8) as u32 } +/// Similar to the `read_num_bytes` but read nums from bytes in big-endian order +/// This is used to read bytes from Java's OutputStream which writes bytes in big-endian +macro_rules! read_num_be_bytes { +($ty:ty, $size:expr, $src:expr) => {{ +debug_assert!($size <= $src.len()); +let mut buffer = <$ty as $crate::common::bit::FromBytes>::Buffer::default(); +buffer.as_mut()[..$size].copy_from_slice(&$src[..$size]); +<$ty>::from_be_bytes(buffer) +}}; +} + /// Converts value `val` of type `T` to a byte vector, by reading `num_bytes` from `val`. /// NOTE: if `val` is less than the size of `T` then it can be truncated. #[inline] diff --git a/core/src/execution/datafusion/expressions/bloom_filter_might_contain.rs b/core/src/execution/datafusion/expressions/bloom_filter_might_contain.rs new file mode 100644 index 000..dd90cd8 --- /dev/null +++ b/core/src/execution/datafusion/expressions/bloom_filter_might_contain.rs @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{ +execution::datafusion::util::spark_bloom_filter::SparkBloomFilter, parquet::data_type::AsBytes, +}; +use arrow::record_batch::RecordBatch; +use arrow_array::cast::as_primitive_array; +use arrow_schema::{DataType, Schema}; +use datafusion::physical_plan::ColumnarValue; +use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue}; +use datafusion_physical_expr::{aggregate::utils::down_cast_any_ref, PhysicalExpr}; +use std::{ +any::Any, +fmt::Display, +hash::{Hash, Hasher}, +sync::Arc, +}; + +/// A physical expression that checks if a value might be in a bloom filter. It corresponds to the +/// Spark's `BloomFilterMightContain` expression. +#[derive(Debug, Hash)] +pub struct BloomFilterMightContain { +pub bloom_filter_expr: Arc, +pub value_expr: Arc, +bloom_filter: Option, +} + +impl Display for BloomFilterMightContain { +fn fmt(, f: std::fmt::Formatter) -> std::fmt::Result { +write!( +f, +"BloomFilterMightContain [bloom_filter_expr: {}, value_expr: {}]", +self.bloom_filter_expr, self.value_expr +) +} +} + +impl PartialEq for BloomFilterMightContain { +fn eq(, _other: Any) -> bool { +down_cast_any
(arrow-datafusion-comet) branch main updated: feat: Support bitwise aggregate functions (#197)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new ed5de4b feat: Support bitwise aggregate functions (#197) ed5de4b is described below commit ed5de4bcb8f8b64fb273e77dcb87f07b9f417984 Author: Huaxin Gao AuthorDate: Thu Mar 14 08:33:36 2024 -0700 feat: Support bitwise aggregate functions (#197) --- core/src/execution/datafusion/planner.rs | 21 ++-- core/src/execution/proto/expr.proto| 18 +++ .../org/apache/comet/serde/QueryPlanSerde.scala| 60 +- .../apache/comet/exec/CometAggregateSuite.scala| 50 ++ 4 files changed, 145 insertions(+), 4 deletions(-) diff --git a/core/src/execution/datafusion/planner.rs b/core/src/execution/datafusion/planner.rs index ef2787f..d52ec80 100644 --- a/core/src/execution/datafusion/planner.rs +++ b/core/src/execution/datafusion/planner.rs @@ -27,9 +27,9 @@ use datafusion::{ physical_expr::{ execution_props::ExecutionProps, expressions::{ -in_list, BinaryExpr, CaseExpr, CastExpr, Column, Count, FirstValue, InListExpr, -IsNotNullExpr, IsNullExpr, LastValue, Literal as DataFusionLiteral, Max, Min, -NegativeExpr, NotExpr, Sum, UnKnownColumn, +in_list, BinaryExpr, BitAnd, BitOr, BitXor, CaseExpr, CastExpr, Column, Count, +FirstValue, InListExpr, IsNotNullExpr, IsNullExpr, LastValue, +Literal as DataFusionLiteral, Max, Min, NegativeExpr, NotExpr, Sum, UnKnownColumn, }, functions::create_physical_expr, AggregateExpr, PhysicalExpr, PhysicalSortExpr, ScalarFunctionExpr, @@ -940,6 +940,21 @@ impl PhysicalPlanner { vec![], ))) } +AggExprStruct::BitAndAgg(expr) => { +let child = self.create_expr(expr.child.as_ref().unwrap(), schema)?; +let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); +Ok(Arc::new(BitAnd::new(child, "bit_and", datatype))) +} +AggExprStruct::BitOrAgg(expr) => { +let child = self.create_expr(expr.child.as_ref().unwrap(), schema)?; +let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); +Ok(Arc::new(BitOr::new(child, "bit_or", datatype))) +} +AggExprStruct::BitXorAgg(expr) => { +let child = self.create_expr(expr.child.as_ref().unwrap(), schema)?; +let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); +Ok(Arc::new(BitXor::new(child, "bit_xor", datatype))) +} } } diff --git a/core/src/execution/proto/expr.proto b/core/src/execution/proto/expr.proto index 8aa81b7..e8d35d1 100644 --- a/core/src/execution/proto/expr.proto +++ b/core/src/execution/proto/expr.proto @@ -88,6 +88,9 @@ message AggExpr { Avg avg = 6; First first = 7; Last last = 8; +BitAndAgg bitAndAgg = 9; +BitOrAgg bitOrAgg = 10; +BitXorAgg bitXorAgg = 11; } } @@ -130,6 +133,21 @@ message Last { bool ignore_nulls = 3; } +message BitAndAgg { + Expr child = 1; + DataType datatype = 2; +} + +message BitOrAgg { + Expr child = 1; + DataType datatype = 2; +} + +message BitXorAgg { + Expr child = 1; + DataType datatype = 2; +} + message Literal { oneof value { bool bool_val = 1; diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 5da926e..87b4dff 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average, Count, Final, First, Last, Max, Min, Partial, Sum} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average, BitAndAgg, BitOrAgg, BitXorAgg, Count, Final, First, Last, Max, Min, Partial, Sum} import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, SinglePartition} @@ -188,6 +188,13 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { } } + private def bitwiseAggTypeSupported(dt: DataType): Boolean = { +dt match { + case _: IntegerType | LongType | ShortType | ByteType => true + case _ => false +}
(arrow-datafusion-comet) branch main updated: build: Enforce scalafix in CI (#203)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new d069713 build: Enforce scalafix in CI (#203) d069713 is described below commit d0697132e83004e8960c11d5a7603b4c21fbb956 Author: advancedxy AuthorDate: Wed Mar 13 15:03:49 2024 +0800 build: Enforce scalafix in CI (#203) --- .github/actions/java-test/action.yaml | 2 +- DEVELOPMENT.md| 4 Makefile | 1 + .../src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala | 1 - spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 2 +- spark/src/test/scala/org/apache/comet/CometCastSuite.scala| 4 ++-- spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala | 2 +- 7 files changed, 10 insertions(+), 6 deletions(-) diff --git a/.github/actions/java-test/action.yaml b/.github/actions/java-test/action.yaml index f82f05b..e1efd9f 100644 --- a/.github/actions/java-test/action.yaml +++ b/.github/actions/java-test/action.yaml @@ -49,7 +49,7 @@ runs: - name: Run Maven compile shell: bash run: | -./mvnw -B compile test-compile scalafix:scalafix -Psemanticdb ${{ inputs.maven_opts }} +./mvnw -B compile test-compile scalafix:scalafix -Dscalafix.mode=CHECK -Psemanticdb ${{ inputs.maven_opts }} - name: Run tests shell: bash diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index 25a6599..6dc0f1f 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -84,3 +84,7 @@ in the respective source code, e.g., `CometTPCHQueryBenchmark`. ## Debugging Comet is a multi-language project with native code written in Rust and JVM code written in Java and Scala. It is possible to debug both native and JVM code concurrently as described in the [DEBUGGING guide](DEBUGGING.md) + +## Submitting a Pull Request +Comet uses `cargo fmt`, [Scalafix](https://github.com/scalacenter/scalafix) and [Spotless](https://github.com/diffplug/spotless/tree/main/plugin-maven) to +automatically format the code. Before submitting a pull request, you can simply run `make format` to format the code. \ No newline at end of file diff --git a/Makefile b/Makefile index 6f599a0..ca5d756 100644 --- a/Makefile +++ b/Makefile @@ -38,6 +38,7 @@ clean: bench: cd core && RUSTFLAGS="-Ctarget-cpu=native" cargo bench $(filter-out $@,$(MAKECMDGOALS)) format: + cd core && cargo fmt ./mvnw compile test-compile scalafix:scalafix -Psemanticdb $(PROFILES) ./mvnw spotless:apply $(PROFILES) diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 87c2265..5720b69 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -26,7 +26,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSessionExtensions -import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.comet._ import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometNativeShuffle} diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 08a499b..902f703 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, SinglePartition} import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils -import org.apache.spark.sql.comet.{CometHashAggregateExec, CometPlan, CometSinkPlaceHolder, DecimalPrecision} +import org.apache.spark.sql.comet.{CometSinkPlaceHolder, DecimalPrecision} import org.apache.spark.sql.execution import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.aggregate.HashAggregateExec diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 565d226..317371f 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -90,13 +90,13 @@ class CometCastSuite extends CometTestBase with AdaptiveSpar
(arrow-datafusion-comet) branch main updated: feat: Enable Comet shuffle manager for Comet shell (#204)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 5d9d95e feat: Enable Comet shuffle manager for Comet shell (#204) 5d9d95e is described below commit 5d9d95e9832de516f4fa677e9d6e7cd0134d1bec Author: Junfan Zhang AuthorDate: Wed Mar 13 15:03:09 2024 +0800 feat: Enable Comet shuffle manager for Comet shell (#204) --- bin/comet-spark-shell | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bin/comet-spark-shell b/bin/comet-spark-shell index 9ae55a1..72100d2 100755 --- a/bin/comet-spark-shell +++ b/bin/comet-spark-shell @@ -81,4 +81,6 @@ RUST_BACKTRACE=1 $SPARK_HOME/bin/spark-shell \ --conf spark.comet.enabled=true \ --conf spark.comet.exec.enabled=true \ --conf spark.comet.exec.all.enabled=true \ + --conf spark.comet.exec.shuffle.enabled=true \ + --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ $@
(arrow-datafusion-comet) branch main updated: fix: Include active spiller when computing peak shuffle memory (#196)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new e920aa4 fix: Include active spiller when computing peak shuffle memory (#196) e920aa4 is described below commit e920aa4f956478c7cdcb939a1689779daef04b09 Author: Chao Sun AuthorDate: Tue Mar 12 22:50:54 2024 -0700 fix: Include active spiller when computing peak shuffle memory (#196) --- .../org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java | 4 1 file changed, 4 insertions(+) diff --git a/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java b/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java index aa806e2..9fe88ec 100644 --- a/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java +++ b/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java @@ -257,6 +257,9 @@ public final class CometShuffleExternalSorter implements CometShuffleChecksumSup for (SpillSorter sorter : spillingSorters) { totalPageSize += sorter.getMemoryUsage(); } +if (activeSpillSorter != null) { + totalPageSize += activeSpillSorter.getMemoryUsage(); +} return totalPageSize; } @@ -274,6 +277,7 @@ public final class CometShuffleExternalSorter implements CometShuffleChecksumSup } private long freeMemory() { +updatePeakMemoryUsed(); long memoryFreed = 0; if (isAsync) { for (SpillSorter sorter : spillingSorters) {
(arrow-datafusion-comet) branch main updated: build: Run Spark SQL tests for 3.4 (#166)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 6bedce4 build: Run Spark SQL tests for 3.4 (#166) 6bedce4 is described below commit 6bedce410f5a5b961eb2cc1dbc8840c17bdab4bb Author: Chao Sun AuthorDate: Tue Mar 12 10:28:58 2024 -0700 build: Run Spark SQL tests for 3.4 (#166) --- .github/actions/setup-spark-builder/action.yaml | 64 ++ .github/workflows/spark_sql_test.yml| 79 ++ dev/diffs/3.4.2.diff| 1306 +++ pom.xml |1 + 4 files changed, 1450 insertions(+) diff --git a/.github/actions/setup-spark-builder/action.yaml b/.github/actions/setup-spark-builder/action.yaml new file mode 100644 index 000..9293dee --- /dev/null +++ b/.github/actions/setup-spark-builder/action.yaml @@ -0,0 +1,64 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: Setup Spark Builder +description: 'Setup Apache Spark to run SQL tests' +inputs: + spark-short-version: +description: 'The Apache Spark short version (e.g., 3.4) to build' +required: true +default: '3.4' + spark-version: +description: 'The Apache Spark version (e.g., 3.4.2) to build' +required: true +default: '3.4.2' + comet-version: +description: 'The Comet version to use for Spark' +required: true +default: '0.1.0-SNAPSHOT' +runs: + using: "composite" + steps: +- name: Clone Spark repo + uses: actions/checkout@v4 + with: +repository: apache/spark +path: apache-spark +ref: v${{inputs.spark-version}} +fetch-depth: 1 + +- name: Setup Spark for Comet + shell: bash + run: | +cd apache-spark +git apply ../dev/diffs/${{inputs.spark-version}}.diff +../mvnw -nsu -q versions:set-property -Dproperty=comet.version -DnewVersion=${{inputs.comet-version}} -DgenerateBackupPoms=false + +- name: Cache Maven dependencies + uses: actions/cache@v4 + with: +path: | + ~/.m2/repository + /root/.m2/repository +key: ${{ runner.os }}-spark-sql-${{ hashFiles('spark/**/pom.xml', 'common/**/pom.xml') }} +restore-keys: | + ${{ runner.os }}-spark-sql- + +- name: Build Comet + shell: bash + run: | +PROFILES="-Pspark-${{inputs.spark-short-version}}" make release diff --git a/.github/workflows/spark_sql_test.yml b/.github/workflows/spark_sql_test.yml new file mode 100644 index 000..5c460b7 --- /dev/null +++ b/.github/workflows/spark_sql_test.yml @@ -0,0 +1,79 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: Spark SQL Tests + +concurrency: + group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }} + cancel-in-progress: true + +on: + push: +paths-ignore: + - "doc/**" + - "**.md" + pull_request: +paths-ignore: + - "doc/**" + - "**.md" + # manual trigger + # https://docs.github.com/en/actions/managing-workflow-runs/manually-running-a-workflow + workflow_dispatch: + +env: + RUST_VERSION: nightly + +jobs: + spark-sql-catalyst: +strategy: + matrix: +os: [ubuntu-latest]
(arrow-datafusion-comet) branch main updated: fix: Try to convert a static list into a set in Rust (#184)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 5dc865d fix: Try to convert a static list into a set in Rust (#184) 5dc865d is described below commit 5dc865d5f31e003b1800d5ec6c04586d9057b462 Author: advancedxy AuthorDate: Tue Mar 12 11:37:30 2024 +0800 fix: Try to convert a static list into a set in Rust (#184) --- core/src/execution/datafusion/planner.rs | 31 --- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/core/src/execution/datafusion/planner.rs b/core/src/execution/datafusion/planner.rs index 33cf636..ef2787f 100644 --- a/core/src/execution/datafusion/planner.rs +++ b/core/src/execution/datafusion/planner.rs @@ -25,9 +25,14 @@ use datafusion::{ common::DataFusionError, logical_expr::{BuiltinScalarFunction, Operator as DataFusionOperator}, physical_expr::{ -expressions::{BinaryExpr, Column, IsNotNullExpr, Literal as DataFusionLiteral}, +execution_props::ExecutionProps, +expressions::{ +in_list, BinaryExpr, CaseExpr, CastExpr, Column, Count, FirstValue, InListExpr, +IsNotNullExpr, IsNullExpr, LastValue, Literal as DataFusionLiteral, Max, Min, +NegativeExpr, NotExpr, Sum, UnKnownColumn, +}, functions::create_physical_expr, -PhysicalExpr, PhysicalSortExpr, +AggregateExpr, PhysicalExpr, PhysicalSortExpr, ScalarFunctionExpr, }, physical_plan::{ aggregates::{AggregateMode as DFAggregateMode, PhysicalGroupBy}, @@ -39,14 +44,6 @@ use datafusion::{ }, }; use datafusion_common::ScalarValue; -use datafusion_physical_expr::{ -execution_props::ExecutionProps, -expressions::{ -CaseExpr, CastExpr, Count, FirstValue, InListExpr, IsNullExpr, LastValue, Max, Min, -NegativeExpr, NotExpr, Sum, UnKnownColumn, -}, -AggregateExpr, ScalarFunctionExpr, -}; use itertools::Itertools; use jni::objects::GlobalRef; use num::{BigInt, ToPrimitive}; @@ -496,7 +493,19 @@ impl PhysicalPlanner { .iter() .map(|x| self.create_expr(x, input_schema.clone()).unwrap()) .collect::>(); -Ok(Arc::new(InListExpr::new(value, list, expr.negated, None))) + +// if schema contains any dictionary type, we should use InListExpr instead of +// in_list as it doesn't handle value being dictionary type correctly +let contains_dict_type = input_schema +.fields() +.iter() +.any(|f| matches!(f.data_type(), DataType::Dictionary(_, _))); +if contains_dict_type { +// TODO: remove the fallback when https://github.com/apache/arrow-datafusion/issues/9530 is fixed +Ok(Arc::new(InListExpr::new(value, list, expr.negated, None))) +} else { +in_list(value, list, , input_schema.as_ref()).map_err(|e| e.into()) +} } ExprStruct::If(expr) => { let if_expr =
(arrow-datafusion-comet) branch main updated: feat: Introduce `CometTaskMemoryManager` and native side memory pool (#83)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new e83635a feat: Introduce `CometTaskMemoryManager` and native side memory pool (#83) e83635a is described below commit e83635ada25e8494cb43d88c4e6de6a40530a8ca Author: Chao Sun AuthorDate: Tue Mar 5 21:51:02 2024 -0800 feat: Introduce `CometTaskMemoryManager` and native side memory pool (#83) --- .../org/apache/comet/CometOutOfMemoryError.java| 27 + core/src/errors.rs | 6 +- core/src/execution/jni_api.rs | 44 +--- core/src/execution/memory_pool.rs | 119 + core/src/execution/mod.rs | 3 + core/src/jvm_bridge/comet_task_memory_manager.rs | 62 +++ core/src/jvm_bridge/mod.rs | 8 +- dev/ensure-jars-have-correct-contents.sh | 2 + .../org/apache/spark/CometTaskMemoryManager.java | 77 + .../scala/org/apache/comet/CometExecIterator.scala | 14 ++- spark/src/main/scala/org/apache/comet/Native.scala | 7 +- .../comet/exec/CometColumnarShuffleSuite.scala | 3 +- .../apache/spark/sql/CometTPCDSQuerySuite.scala| 4 +- .../org/apache/spark/sql/CometTPCHQuerySuite.scala | 4 +- .../scala/org/apache/spark/sql/CometTestBase.scala | 31 ++ 15 files changed, 370 insertions(+), 41 deletions(-) diff --git a/common/src/main/java/org/apache/comet/CometOutOfMemoryError.java b/common/src/main/java/org/apache/comet/CometOutOfMemoryError.java new file mode 100644 index 000..8a9e8d1 --- /dev/null +++ b/common/src/main/java/org/apache/comet/CometOutOfMemoryError.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet; + +/** OOM error specific for Comet memory management */ +public class CometOutOfMemoryError extends OutOfMemoryError { + public CometOutOfMemoryError(String msg) { +super(msg); + } +} diff --git a/core/src/errors.rs b/core/src/errors.rs index e99af7a..1d5766c 100644 --- a/core/src/errors.rs +++ b/core/src/errors.rs @@ -101,7 +101,11 @@ pub enum CometError { #[from] source: std::num::ParseFloatError, }, - +#[error(transparent)] +BoolFormat { +#[from] +source: std::str::ParseBoolError, +}, #[error(transparent)] Format { #[from] diff --git a/core/src/execution/jni_api.rs b/core/src/execution/jni_api.rs index 1d55d3f..20f98a3 100644 --- a/core/src/execution/jni_api.rs +++ b/core/src/execution/jni_api.rs @@ -42,7 +42,7 @@ use jni::{ }; use std::{collections::HashMap, sync::Arc, task::Poll}; -use super::{serde, utils::SparkArrowConvert}; +use super::{serde, utils::SparkArrowConvert, CometMemoryPool}; use crate::{ errors::{try_unwrap_or_throw, CometError, CometResult}, @@ -103,6 +103,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( iterators: jobjectArray, serialized_query: jbyteArray, metrics_node: JObject, +comet_task_memory_manager_obj: JObject, ) -> jlong { try_unwrap_or_throw(, |mut env| { // Init JVM classes @@ -147,11 +148,13 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( let input_source = Arc::new(jni_new_global_ref!(env, input_source)?); input_sources.push(input_source); } +let task_memory_manager = +Arc::new(jni_new_global_ref!(env, comet_task_memory_manager_obj)?); // We need to keep the session context alive. Some session state like temporary // dictionaries are stored in session context. If it is dropped, the temporary // dictionaries will be dropped as well. -let session = prepare_datafusion_session_context()?; +let session = prepare_datafusion_session_context(, task_memory_manager)?; let exec_context = Box::new(ExecutionContext { id, @@ -17
(arrow-datafusion-comet) branch main updated: minor: Remove unnecessary logic (#169)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new a028132 minor: Remove unnecessary logic (#169) a028132 is described below commit a0281327abce260c802bd834a7cef646909377e5 Author: Chao Sun AuthorDate: Tue Mar 5 21:15:35 2024 -0800 minor: Remove unnecessary logic (#169) --- .../org/apache/comet/serde/QueryPlanSerde.scala| 44 -- .../apache/comet/exec/CometAggregateSuite.scala| 2 +- 2 files changed, 8 insertions(+), 38 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 8b4eaa6..56b6690 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1799,43 +1799,13 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { case _ => return None } - val output = mode match { -case CometAggregateMode.Partial => child.output -case CometAggregateMode.Final => - // Assuming `Final` always follows `Partial` aggregation, this find the first - // `Partial` aggregation and get the input attributes from it. - // During finding partial aggregation, we must ensure all traversed op are - // native operators. If not, we should fallback to Spark. - var seenNonNativeOp = false - var partialAggInput: Option[Seq[Attribute]] = None - child.transformDown { -case op if !op.isInstanceOf[CometPlan] => - seenNonNativeOp = true - op -case op @ CometHashAggregateExec(_, _, _, _, input, Some(Partial), _, _) => - if (!seenNonNativeOp && partialAggInput.isEmpty) { -partialAggInput = Some(input) - } - op - } - - if (partialAggInput.isDefined) { -partialAggInput.get - } else { -return None - } -case _ => return None - } - - val binding = if (mode == CometAggregateMode.Final) { -// In final mode, the aggregate expressions are bound to the output of the -// child and partial aggregate expressions buffer attributes produced by partial -// aggregation. This is done in Spark `HashAggregateExec` internally. In Comet, -// we don't have to do this because we don't use the merging expression. -false - } else { -true - } + // In final mode, the aggregate expressions are bound to the output of the + // child and partial aggregate expressions buffer attributes produced by partial + // aggregation. This is done in Spark `HashAggregateExec` internally. In Comet, + // we don't have to do this because we don't use the merging expression. + val binding = mode != CometAggregateMode.Final + // `output` is only used when `binding` is true (i.e., non-Final) + val output = child.output val aggExprs = aggregateExpressions.map(aggExprToProto(_, output, binding)) if (childOp.nonEmpty && groupingExprs.forall(_.isDefined) && diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index 3b4fb1c..bc645cb 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -845,7 +845,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { s" SUM(distinct col2) FROM $table group by col1", expectedNumOfCometAggregates) -expectedNumOfCometAggregates = 1 +expectedNumOfCometAggregates = if (cometColumnShuffleEnabled) 2 else 1 checkSparkAnswerAndNumOfAggregates( "SELECT COUNT(col2), MIN(col2), COUNT(DISTINCT col2), SUM(col2)," + s" SUM(DISTINCT col2), COUNT(DISTINCT col2), col1 FROM $table group by col1",
(arrow-datafusion-comet) branch main updated: build: Upload test reports and coverage (#163)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 56ccf6a build: Upload test reports and coverage (#163) 56ccf6a is described below commit 56ccf6a3cc1471ce4445be21a7fe21911e8fd33c Author: advancedxy AuthorDate: Tue Mar 5 12:33:50 2024 +0800 build: Upload test reports and coverage (#163) --- .github/actions/java-test/action.yaml | 24 +--- .github/workflows/pr_build.yml| 12 +++- pom.xml | 28 ++-- 3 files changed, 54 insertions(+), 10 deletions(-) diff --git a/.github/actions/java-test/action.yaml b/.github/actions/java-test/action.yaml index 4b594e6..f82f05b 100644 --- a/.github/actions/java-test/action.yaml +++ b/.github/actions/java-test/action.yaml @@ -18,10 +18,15 @@ name: "Java Test" description: "Run Java tests" inputs: - MAVEN_OPTS: + maven_opts: description: 'Maven options passed to the mvn command' required: false default: '' + upload-test-reports: +description: 'Whether to upload test results including coverage to GitHub' +required: false +default: 'false' + runs: using: "composite" steps: @@ -44,9 +49,22 @@ runs: - name: Run Maven compile shell: bash run: | -./mvnw -B compile test-compile scalafix:scalafix -Psemanticdb ${{ inputs.MAVEN_OPTS }} +./mvnw -B compile test-compile scalafix:scalafix -Psemanticdb ${{ inputs.maven_opts }} - name: Run tests shell: bash run: | -SPARK_HOME=`pwd` ./mvnw -B clean install ${{ inputs.MAVEN_OPTS }} +SPARK_HOME=`pwd` ./mvnw -B clean install ${{ inputs.maven_opts }} + +- name: Upload test results + if: ${{ inputs.upload-test-reports == 'true' }} + uses: actions/upload-artifact@v4 + with: + name: java-test-reports-${{ github.run_id }} + path: "**/target/surefire-reports/*.txt" + retention-days: 7 # 1 week for test reports + overwrite: true + +- name: Upload coverage results + if: ${{ inputs.upload-test-reports == 'true' }} + uses: codecov/codecov-action@v3 # uses v3 as it allows tokenless uploading diff --git a/.github/workflows/pr_build.yml b/.github/workflows/pr_build.yml index e89cc99..1c1baf6 100644 --- a/.github/workflows/pr_build.yml +++ b/.github/workflows/pr_build.yml @@ -69,7 +69,9 @@ jobs: name: Java test steps uses: ./.github/actions/java-test with: - MAVEN_OPTS: -Pspark-${{ matrix.spark-version }} + maven_opts: -Pspark-${{ matrix.spark-version }} + # upload test reports only for java 17 + upload-test-reports: ${{ matrix.java_version == '17' }} linux-test-with-old-spark: strategy: @@ -98,7 +100,7 @@ jobs: - name: Java test steps uses: ./.github/actions/java-test with: - MAVEN_OPTS: -Pspark-${{ matrix.spark-version }} + maven_opts: -Pspark-${{ matrix.spark-version }} macos-test: strategy: @@ -125,7 +127,7 @@ jobs: name: Java test steps uses: ./.github/actions/java-test with: - MAVEN_OPTS: -Pspark-${{ matrix.spark-version }} + maven_opts: -Pspark-${{ matrix.spark-version }} macos-aarch64-test: strategy: @@ -157,7 +159,7 @@ jobs: name: Java test steps uses: ./.github/actions/java-test with: - MAVEN_OPTS: -Pspark-${{ matrix.spark-version }} + maven_opts: -Pspark-${{ matrix.spark-version }} macos-aarch64-test-with-old-spark: strategy: @@ -186,5 +188,5 @@ jobs: name: Java test steps uses: ./.github/actions/java-test with: - MAVEN_OPTS: -Pspark-${{ matrix.spark-version }} + maven_opts: -Pspark-${{ matrix.spark-version }} diff --git a/pom.xml b/pom.xml index 657eb97..f19ce70 100644 --- a/pom.xml +++ b/pom.xml @@ -55,6 +55,7 @@ under the License. 14.0.2 1.9.13 2.43.0 +0.8.11 ${project.basedir}/../core/target/debug darwin x86_64 @@ -84,6 +85,7 @@ under the License. --add-opens=java.base/sun.util.calendar=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false +-ea -Xmx4g -Xss4m ${extraJavaTestArgs} @@ -672,7 +674,6 @@ under the License. D org.apache.comet.IntegrationTestSuite --ea -Xmx4g -Xss4m ${extraJavaTestArgs} file:src/test/resources/log4j2.properties @@ -714,7 +715,6 @@ under the License. file:src/test/resources/log4j2.properties --ea -Xmx4g -Xss4m ${extraJavaTestArgs} @@ -772,6 +772,11 @@ under the
(arrow-datafusion-comet) branch main updated: feat: Enable min/max for boolean type (#165)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 852b8cf feat: Enable min/max for boolean type (#165) 852b8cf is described below commit 852b8cfbfc200b37e3d8b4c3324f6e73cf8bf42d Author: Huaxin Gao AuthorDate: Mon Mar 4 16:55:25 2024 -0800 feat: Enable min/max for boolean type (#165) --- .../org/apache/comet/serde/QueryPlanSerde.scala| 2 +- .../apache/comet/exec/CometAggregateSuite.scala| 26 ++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index a412720..b154fb6 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -181,7 +181,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { private def minMaxDataTypeSupported(dt: DataType): Boolean = { dt match { - case _: NumericType | DateType | TimestampType => true + case _: NumericType | DateType | TimestampType | BooleanType => true case _ => false } } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index d64a3a3..c2faa65 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -886,6 +886,32 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("test bool_and/bool_or") { +withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + Seq(true, false).foreach { bosonColumnShuffleEnabled => +withSQLConf( + CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> bosonColumnShuffleEnabled.toString) { + Seq(true, false).foreach { dictionary => +withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { + val table = "test" + withTable(table) { +sql(s"create table $table(a boolean, b int) using parquet") +sql(s"insert into $table values(true, 1)") +sql(s"insert into $table values(false, 2)") +sql(s"insert into $table values(true, 3)") +sql(s"insert into $table values(true, 3)") +// Spark maps BOOL_AND to MIN and BOOL_OR to MAX +checkSparkAnswerAndNumOfAggregates( + s"SELECT MIN(a), MAX(a), BOOL_AND(a), BOOL_OR(a) FROM $table", + 2) + } +} + } +} + } +} + } + protected def checkSparkAnswerAndNumOfAggregates(query: String, numAggregates: Int): Unit = { val df = sql(query) checkSparkAnswer(df)
(arrow-datafusion-comet) branch main updated: chore: Fix warnings in both compiler and test environments (#164)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 72398a6 chore: Fix warnings in both compiler and test environments (#164) 72398a6 is described below commit 72398a68a44ec38dfe21b3d76cabeace1bfca0cf Author: advancedxy AuthorDate: Tue Mar 5 01:39:58 2024 +0800 chore: Fix warnings in both compiler and test environments (#164) --- common/src/main/java/org/apache/comet/parquet/BatchReader.java| 1 + core/src/lib.rs | 3 ++- pom.xml | 4 ++-- .../src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala | 2 +- spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 4 ++-- .../main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala | 4 ++-- spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala | 1 - spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala | 2 +- 8 files changed, 11 insertions(+), 10 deletions(-) diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java b/common/src/main/java/org/apache/comet/parquet/BatchReader.java index 87302b3..9940390 100644 --- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/BatchReader.java @@ -517,6 +517,7 @@ public class BatchReader extends RecordReader implements Cl } } + @SuppressWarnings("deprecation") private boolean loadNextRowGroupIfNecessary() throws Throwable { // More rows can be read from loaded row group. No need to load next one. if (rowsRead != totalRowsLoaded) return true; diff --git a/core/src/lib.rs b/core/src/lib.rs index d104788..2e85136 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -34,7 +34,7 @@ use jni::{ }; use log::{info, LevelFilter}; use log4rs::{ -append::console::ConsoleAppender, +append::console::{ConsoleAppender, Target}, config::{load_config_file, Appender, Deserializers, Root}, encode::pattern::PatternEncoder, Config, @@ -99,6 +99,7 @@ const LOG_PATTERN: = "{d(%y/%m/%d %H:%M:%S)} {l} {f}: {m}{n}"; // Creates a default log4rs config, which logs to console with `INFO` level. fn default_logger_config() -> CometResult { let console_append = ConsoleAppender::builder() +.target(Target::Stderr) .encoder(Box::new(PatternEncoder::new(LOG_PATTERN))) .build(); let appender = Appender::builder().build("console", Box::new(console_append)); diff --git a/pom.xml b/pom.xml index aa59d19..657eb97 100644 --- a/pom.xml +++ b/pom.xml @@ -711,9 +711,9 @@ under the License. maven-surefire-plugin 3.1.0 - + file:src/test/resources/log4j2.properties - + -ea -Xmx4g -Xss4m ${extraJavaTestArgs} diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 8037f55..87c2265 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -115,7 +115,7 @@ class CometSparkSessionExtensions // data source V1 case scanExec @ FileSourceScanExec( HadoopFsRelation(_, partitionSchema, _, _, _: ParquetFileFormat, _), -_: Seq[AttributeReference], +_: Seq[_], requiredSchema, _, _, diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 75a2ff9..a412720 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1472,14 +1472,14 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { // With Spark 3.4, CharVarcharCodegenUtils.readSidePadding gets called to pad spaces for char // types. Use rpad to achieve the behavior. See https://github.com/apache/spark/pull/38151 case StaticInvoke( -_: Class[CharVarcharCodegenUtils], +clz: Class[_], _: StringType, "readSidePadding", arguments, _, true, false, -true) if arguments.size == 2 => +true) if clz == classOf[CharVarcharCodegenUtils] && arguments.size == 2 => val argsExpr = Seq( exprToProtoInternal(Cast(arguments(0), StringType), inputs), exprToProtoInternal(argu
(arrow-datafusion-comet) branch main updated (f67f613 -> 0ad78cd)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git from f67f613 fix: rat check error in vscode ide (#161) add 0ad78cd minor: Only trigger PR title checker on pull requests (#154) No new revisions were added by this update. Summary of changes: .github/workflows/pr_build.yml | 15 -- .../workflows/pr_title_check.yml | 34 -- 2 files changed, 25 insertions(+), 24 deletions(-) copy core/rustfmt.toml => .github/workflows/pr_title_check.yml (50%)
(arrow-datafusion-comet) branch main updated: build: Support CI pipelines for Spark 3.2, 3.3 and 3.4 (#153)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new a3dd8df build: Support CI pipelines for Spark 3.2, 3.3 and 3.4 (#153) a3dd8df is described below commit a3dd8df22f016776f227721a31bfaa8bf730ce03 Author: advancedxy AuthorDate: Sun Mar 3 13:58:10 2024 +0800 build: Support CI pipelines for Spark 3.2, 3.3 and 3.4 (#153) --- .github/actions/java-test/action.yaml | 11 ++- .github/workflows/pr_build.yml | 80 +++--- .../apache/comet/CometSparkSessionExtensions.scala | 4 ++ .../org/apache/comet/CometExpressionSuite.scala| 3 +- .../org/apache/comet/exec/CometExecSuite.scala | 3 + .../scala/org/apache/spark/sql/CometTestBase.scala | 2 +- 6 files changed, 90 insertions(+), 13 deletions(-) diff --git a/.github/actions/java-test/action.yaml b/.github/actions/java-test/action.yaml index 6c3af79..4b594e6 100644 --- a/.github/actions/java-test/action.yaml +++ b/.github/actions/java-test/action.yaml @@ -15,6 +15,13 @@ # specific language governing permissions and limitations # under the License. +name: "Java Test" +description: "Run Java tests" +inputs: + MAVEN_OPTS: +description: 'Maven options passed to the mvn command' +required: false +default: '' runs: using: "composite" steps: @@ -37,9 +44,9 @@ runs: - name: Run Maven compile shell: bash run: | -./mvnw -B compile test-compile scalafix:scalafix -Psemanticdb +./mvnw -B compile test-compile scalafix:scalafix -Psemanticdb ${{ inputs.MAVEN_OPTS }} - name: Run tests shell: bash run: | -SPARK_HOME=`pwd` ./mvnw -B clean install +SPARK_HOME=`pwd` ./mvnw -B clean install ${{ inputs.MAVEN_OPTS }} diff --git a/.github/workflows/pr_build.yml b/.github/workflows/pr_build.yml index d905095..b322c89 100644 --- a/.github/workflows/pr_build.yml +++ b/.github/workflows/pr_build.yml @@ -44,13 +44,14 @@ jobs: os: [ubuntu-latest] java_version: [8, 11, 17] test-target: [rust, java] +spark-version: ['3.4'] is_push_event: - ${{ github.event_name == 'push' }} exclude: # exclude java 11 for pull_request event - java_version: 11 is_push_event: false fail-fast: false -name: ${{ matrix.test-target }} test on ${{ matrix.os }} with java ${{ matrix.java_version }} +name: ${{ matrix.os }}/java ${{ matrix.java_version }}-spark-${{matrix.spark-version}}/${{ matrix.test-target }} runs-on: ${{ matrix.os }} container: image: amd64/rust @@ -61,14 +62,43 @@ jobs: with: rust-version: ${{env.RUST_VERSION}} jdk-version: ${{ matrix.java_version }} - - - uses: actions/checkout@v4 - if: matrix.test-target == 'rust' name: Rust test steps uses: ./.github/actions/rust-test - if: matrix.test-target == 'java' name: Java test steps uses: ./.github/actions/java-test +with: + MAVEN_OPTS: -Pspark-${{ matrix.spark-version }} + + linux-test-with-old-spark: +strategy: + matrix: +os: [ubuntu-latest] +java_version: [8, 11, 17] +test-target: [java] +spark-version: ['3.2', '3.3'] +exclude: + - java_version: 17 +spark-version: '3.2' + - java_version: 11 +spark-version: '3.2' + fail-fast: false +name: ${{ matrix.os }}/java ${{ matrix.java_version }}-spark-${{matrix.spark-version}}/${{ matrix.test-target }} +runs-on: ${{ matrix.os }} +container: + image: amd64/rust +steps: + - uses: actions/checkout@v4 + - name: Setup Rust & Java toolchain +uses: ./.github/actions/setup-builder +with: + rust-version: ${{env.RUST_VERSION}} + jdk-version: ${{ matrix.java_version }} + - name: Java test steps +uses: ./.github/actions/java-test +with: + MAVEN_OPTS: -Pspark-${{ matrix.spark-version }} macos-test: strategy: @@ -76,9 +106,10 @@ jobs: os: [macos-13] java_version: [8, 11, 17] test-target: [rust, java] +spark-version: ['3.4'] fail-fast: false if: github.event_name == 'push' -name: ${{ matrix.test-target }} test on ${{ matrix.os }} with java ${{ matrix.java_version }} +name: ${{ matrix.os }}/java ${{ matrix.java_version }}-spark-${{matrix.spark-version}}/${{ matrix.test-target }} runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v4 @@ -87,27 +118,28 @@ jobs: with: rust-version: ${{env.RUST_VERSION}} jdk-version: ${{ matrix.java_version }} - - - uses: actions/checkout@v4 - if: matrix.t
(arrow-datafusion-comet) branch main updated: build: Add checker for PR title (#151)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new cad16d5 build: Add checker for PR title (#151) cad16d5 is described below commit cad16d53f7076d6c115613a31afdc1fbc975aa87 Author: Chao Sun AuthorDate: Sat Mar 2 11:22:55 2024 -0800 build: Add checker for PR title (#151) --- .github/workflows/pr_build.yml | 15 +++ 1 file changed, 15 insertions(+) diff --git a/.github/workflows/pr_build.yml b/.github/workflows/pr_build.yml index aa69aea..d905095 100644 --- a/.github/workflows/pr_build.yml +++ b/.github/workflows/pr_build.yml @@ -126,3 +126,18 @@ jobs: - if: matrix.test-target == 'java' name: Java test steps uses: ./.github/actions/java-test + check-pr-title: +runs-on: ubuntu-latest +steps: + - uses: actions/checkout@v4 + - name: Check PR title +env: + PR_TITLE: ${{ github.event.pull_request.title }} +run: | + if ! echo $PR_TITLE | grep -Eq '^(\w+)(\(.+\))?: .+$'; then +echo "PR title does not follow conventional commit style." +echo "Please use a title in the format: type: message, or type(scope): message" +echo "Example: feat: Add support for sort-merge join" +exit 1 + fi +
(arrow-datafusion-comet) branch main updated: test: Add golden files for test (#150)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 23c748b test: Add golden files for test (#150) 23c748b is described below commit 23c748b7fe4a4a125f10a1af3a36ba488ae6a0e2 Author: Steve Vaughan AuthorDate: Fri Mar 1 18:47:38 2024 -0500 test: Add golden files for test (#150) Co-authored-by: Steve Vaughan --- core/src/errors.rs | 1 - core/testdata/backtrace.txt | 22 ++ core/testdata/stacktrace.txt | 12 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/core/src/errors.rs b/core/src/errors.rs index 16ed7c3..e99af7a 100644 --- a/core/src/errors.rs +++ b/core/src/errors.rs @@ -651,7 +651,6 @@ mod tests { /// See [`object_panic_exception`] for a test which involves generating a panic and verifying /// that the resulting stack trace includes the offending call. #[test] -#[ignore] pub fn stacktrace_string() { // Setup: Start with a backtrace that includes all of the expected scenarios, including // cases where the file and location are not provided as part of the backtrace capture diff --git a/core/testdata/backtrace.txt b/core/testdata/backtrace.txt new file mode 100644 index 000..54ea9e8 --- /dev/null +++ b/core/testdata/backtrace.txt @@ -0,0 +1,22 @@ + 0: std::backtrace_rs::backtrace::libunwind::trace + at /rustc/ec08a0337f3556212525dbf1d3b41e19bdf27621/library/std/src/../../backtrace/src/backtrace/libunwind.rs:93:5 + 1: std::backtrace_rs::backtrace::trace_unsynchronized + at /rustc/ec08a0337f3556212525dbf1d3b41e19bdf27621/library/std/src/../../backtrace/src/backtrace/mod.rs:66:5 + 2: std::backtrace::Backtrace::create + at /rustc/ec08a0337f3556212525dbf1d3b41e19bdf27621/library/std/src/backtrace.rs:331:13 + 3: std::backtrace::Backtrace::capture + at /rustc/ec08a0337f3556212525dbf1d3b41e19bdf27621/library/std/src/backtrace.rs:297:9 + 4: comet::Java_org_apache_comet_NativeBase_init::{{closure}} + at /Users/somebody/src/arrow-datafusion-comet/core/src/lib.rs:70:77 + 5: std::panicking::try::do_call + at /rustc/ec08a0337f3556212525dbf1d3b41e19bdf27621/library/std/src/panicking.rs:526:40 + 6: ___rust_try + 7: std::panicking::try + at /rustc/ec08a0337f3556212525dbf1d3b41e19bdf27621/library/std/src/panicking.rs:490:19 + 8: std::panic::catch_unwind + at /rustc/ec08a0337f3556212525dbf1d3b41e19bdf27621/library/std/src/panic.rs:142:14 + 9: comet::errors::try_unwrap_or_throw + at /Users/somebody/src/arrow-datafusion-comet/core/src/errors.rs:369:43 + 10: Java_org_apache_comet_NativeBase_init + at /Users/somebody/src/arrow-datafusion-comet/core/src/lib.rs:53:5 + diff --git a/core/testdata/stacktrace.txt b/core/testdata/stacktrace.txt new file mode 100644 index 000..8b825fa --- /dev/null +++ b/core/testdata/stacktrace.txt @@ -0,0 +1,12 @@ +Some Error Message +at std::backtrace_rs::backtrace::libunwind::trace(/rustc/ec08a0337f3556212525dbf1d3b41e19bdf27621/library/std/src/../../backtrace/src/backtrace/libunwind.rs:93) +at std::backtrace_rs::backtrace::trace_unsynchronized(/rustc/ec08a0337f3556212525dbf1d3b41e19bdf27621/library/std/src/../../backtrace/src/backtrace/mod.rs:66) +at std::backtrace::Backtrace::create(/rustc/ec08a0337f3556212525dbf1d3b41e19bdf27621/library/std/src/backtrace.rs:331) +at std::backtrace::Backtrace::capture(/rustc/ec08a0337f3556212525dbf1d3b41e19bdf27621/library/std/src/backtrace.rs:297) +at comet::Java_org_apache_comet_NativeBase_init::{{closure}}(/Users/somebody/src/arrow-datafusion-comet/core/src/lib.rs:70) +at std::panicking::try::do_call(/rustc/ec08a0337f3556212525dbf1d3b41e19bdf27621/library/std/src/panicking.rs:526) +at ___rust_try(__internal__:0) +at std::panicking::try(/rustc/ec08a0337f3556212525dbf1d3b41e19bdf27621/library/std/src/panicking.rs:490) +at std::panic::catch_unwind(/rustc/ec08a0337f3556212525dbf1d3b41e19bdf27621/library/std/src/panic.rs:142) +at comet::errors::try_unwrap_or_throw(/Users/somebody/src/arrow-datafusion-comet/core/src/errors.rs:369) +at Java_org_apache_comet_NativeBase_init(/Users/somebody/src/arrow-datafusion-comet/core/src/lib.rs:53)
(arrow-datafusion-comet) branch main updated: Minor: Update README.md with system diagram (#148)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 1f53e25 Minor: Update README.md with system diagram (#148) 1f53e25 is described below commit 1f53e25505b6c646acfbc813bcd6af06f1390cb0 Author: Andrew Lamb AuthorDate: Fri Mar 1 17:16:19 2024 -0500 Minor: Update README.md with system diagram (#148) --- README.md| 9 - doc/comet-system-diagram.png | Bin 0 -> 30027 bytes 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 5fb90be..572a9d2 100644 --- a/README.md +++ b/README.md @@ -22,13 +22,20 @@ under the License. Comet is an Apache Spark plugin that uses [Apache Arrow DataFusion](https://arrow.apache.org/datafusion/) as native runtime to achieve improvement in terms of query efficiency and query runtime. -On a high level, Comet aims to support: +Comet runs Spark SQL queries using the native DataFusion runtime, which is +typically faster and more resource efficient than JVM based runtimes. + + + +Comet aims to support: - a native Parquet implementation, including both reader and writer - full implementation of Spark operators, including Filter/Project/Aggregation/Join/Exchange etc. - full implementation of Spark built-in expressions - a UDF framework for users to migrate their existing UDF to native +## Architecture + The following diagram illustrates the architecture of Comet: diff --git a/doc/comet-system-diagram.png b/doc/comet-system-diagram.png new file mode 100644 index 000..e7c9075 Binary files /dev/null and b/doc/comet-system-diagram.png differ
(arrow-datafusion-comet) branch main updated: build: Make the build system work out of box (#136)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 0b73c15 build: Make the build system work out of box (#136) 0b73c15 is described below commit 0b73c15c0b786401209ef931df79be9e14266411 Author: advancedxy AuthorDate: Fri Mar 1 23:57:52 2024 +0800 build: Make the build system work out of box (#136) --- .github/actions/java-test/action.yaml | 13 ++ .github/actions/rust-test/action.yaml | 18 ++ .github/workflows/pr_build.yml| 6 - DEVELOPMENT.md| 23 +- Makefile | 15 +--- pom.xml | 46 --- 6 files changed, 69 insertions(+), 52 deletions(-) diff --git a/.github/actions/java-test/action.yaml b/.github/actions/java-test/action.yaml index ae6d80d..6c3af79 100644 --- a/.github/actions/java-test/action.yaml +++ b/.github/actions/java-test/action.yaml @@ -37,18 +37,9 @@ runs: - name: Run Maven compile shell: bash run: | -echo "JAVA_VERSION=${JAVA_VERSION}" -if [ $JAVA_VERSION == "1.8" ]; then - ./mvnw -B compile test-compile scalafix:scalafix -Psemanticdb -Djava.version=${JAVA_VERSION} -Dspotless.version=2.29.0 -else - ./mvnw -B compile test-compile scalafix:scalafix -Psemanticdb -Djava.version=${JAVA_VERSION} -fi +./mvnw -B compile test-compile scalafix:scalafix -Psemanticdb - name: Run tests shell: bash run: | -if [ $JAVA_VERSION == "1.8" ]; then - SPARK_HOME=`pwd` ./mvnw -B clean install -Djava.version=${JAVA_VERSION} -Dspotless.version=2.29.0 -else - SPARK_HOME=`pwd` ./mvnw -B clean install -Djava.version=${JAVA_VERSION} -fi +SPARK_HOME=`pwd` ./mvnw -B clean install diff --git a/.github/actions/rust-test/action.yaml b/.github/actions/rust-test/action.yaml index cb6c29c..bf0d0ba 100644 --- a/.github/actions/rust-test/action.yaml +++ b/.github/actions/rust-test/action.yaml @@ -51,24 +51,10 @@ runs: shell: bash run: | cd common -if [ $JAVA_VERSION == "1.8" ]; then - ../mvnw -B clean compile -DskipTests -Djava.version=${JAVA_VERSION} -Dspotless.version=2.29.0 -else - ../mvnw -B clean compile -DskipTests -Djava.version=${JAVA_VERSION} -fi +../mvnw -B clean compile -DskipTests - name: Run Cargo test shell: bash run: | cd core -# This is required to run some JNI related tests on the Rust side - JAVA_LD_LIBRARY_PATH=$JAVA_HOME/lib/server:$JAVA_HOME/lib:$JAVA_HOME/lib/jli -# special handing for java 1.8 for both linux and mac distributions -if [ $JAVA_VERSION == "8" ]; then - JAVA_LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/jli:$JAVA_HOME/jre/lib/server:$JAVA_HOME/jre/lib:$JAVA_HOME/jre/lib/jli -fi -RUST_BACKTRACE=1 \ -LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$JAVA_LD_LIBRARY_PATH \ -DYLD_LIBRARY_PATH=$DYLD_LIBRARY_PATH:$JAVA_LD_LIBRARY_PATH \ -cargo test - +RUST_BACKTRACE=1 cargo test diff --git a/.github/workflows/pr_build.yml b/.github/workflows/pr_build.yml index 316532c..aa69aea 100644 --- a/.github/workflows/pr_build.yml +++ b/.github/workflows/pr_build.yml @@ -54,8 +54,6 @@ jobs: runs-on: ${{ matrix.os }} container: image: amd64/rust -env: - JAVA_VERSION: ${{ matrix.java_version == 8 && '1.8' || format('{0}', matrix.java_version) }} steps: - uses: actions/checkout@v4 - name: Setup Rust & Java toolchain @@ -82,8 +80,6 @@ jobs: if: github.event_name == 'push' name: ${{ matrix.test-target }} test on ${{ matrix.os }} with java ${{ matrix.java_version }} runs-on: ${{ matrix.os }} -env: - JAVA_VERSION: ${{ matrix.java_version == 8 && '1.8' || format('{0}', matrix.java_version) }} steps: - uses: actions/checkout@v4 - name: Setup Rust & Java toolchain @@ -113,8 +109,6 @@ jobs: fail-fast: false name: ${{ matrix.test-target }} test on macos-aarch64 with java ${{ matrix.java_version }} runs-on: macos-14 -env: - JAVA_VERSION: ${{ matrix.java_version == 8 && '1.8' || format('{0}', matrix.java_version) }} steps: - uses: actions/checkout@v4 - name: Setup Rust & Java toolchain diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index 1793bb9..25a6599 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -30,7 +30,7 @@ under the License. ## Development Setup -1. Make sure `JAVA_HOME` is set and point to JDK 11 installation. +1. Make sur
(arrow-datafusion-comet) branch main updated: build: Refine names in benchmark.yml (#132)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 7b155f2 build: Refine names in benchmark.yml (#132) 7b155f2 is described below commit 7b155f27c366a6319455f14e716408a707520309 Author: advancedxy AuthorDate: Fri Mar 1 01:16:54 2024 +0800 build: Refine names in benchmark.yml (#132) --- .github/workflows/benchmark.yml | 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index adfa1ae..8b3ae7c 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -name: Run TPC-DS Benchmark +name: TPC-DS Correctness concurrency: group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }} @@ -39,7 +39,7 @@ env: jobs: prepare: -name: Build native lib and prepare TPC-DS data +name: Build native and prepare data runs-on: ubuntu-latest container: image: amd64/rust @@ -97,7 +97,7 @@ jobs: cd .. benchmark: -name: Run TPC-DS benchmark +name: Run TPCDSQuerySuite runs-on: ubuntu-latest needs: [prepare] container: @@ -123,7 +123,7 @@ jobs: ${{ runner.os }}-java-maven- - name: Restore TPC-DS generated data id: cache-tpcds-sf-1 -uses: actions/cache@v4 +uses: actions/cache/restore@v4 with: path: ./tpcds-sf-1 key: tpcds-${{ hashFiles('.github/workflows/benchmark.yml') }}
(arrow-datafusion-comet) branch main updated: build: Re-enable TPCDS queries (#133)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new ffc2dd2 build: Re-enable TPCDS queries (#133) ffc2dd2 is described below commit ffc2dd2efd5920f77cb75b1fde91e1828e80dd4c Author: Liang-Chi Hsieh AuthorDate: Thu Feb 29 09:16:35 2024 -0800 build: Re-enable TPCDS queries (#133) --- spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala b/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala index 5d7b603..65ea8ba 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala @@ -27,7 +27,7 @@ class CometTPCDSQuerySuite extends { // This is private in `TPCDSBase`. val excludedTpcdsQueries: Seq[String] = -Seq("q34", "q66", "q64", "q71", "q88", "q90", "q96") +Seq("q66", "q71", "q88", "q90", "q96") // This is private in `TPCDSBase` and `excludedTpcdsQueries` is private too. // So we cannot override `excludedTpcdsQueries` to exclude the queries.
(arrow-datafusion-comet) branch main updated: build: Separate and speedup TPC-DS benchmark (#130)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new e2a6aca build: Separate and speedup TPC-DS benchmark (#130) e2a6aca is described below commit e2a6acaf9b1a318f6ffee2f0a2a0d5899be1e348 Author: advancedxy AuthorDate: Thu Feb 29 08:56:16 2024 +0800 build: Separate and speedup TPC-DS benchmark (#130) --- .github/workflows/{pr_build.yml => benchmark.yml} | 169 +- .github/workflows/pr_build.yml| 61 2 files changed, 66 insertions(+), 164 deletions(-) diff --git a/.github/workflows/pr_build.yml b/.github/workflows/benchmark.yml similarity index 53% copy from .github/workflows/pr_build.yml copy to .github/workflows/benchmark.yml index fe4dd04..adfa1ae 100644 --- a/.github/workflows/pr_build.yml +++ b/.github/workflows/benchmark.yml @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -name: PR Build +name: Run TPC-DS Benchmark concurrency: group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }} @@ -38,103 +38,8 @@ env: RUST_VERSION: nightly jobs: - linux-test: -strategy: - matrix: -os: [ubuntu-latest] -java_version: [8, 11, 17] -test-target: [rust, java] -is_push_event: - - ${{ github.event_name == 'push' }} -exclude: # exclude java 11 for pull_request event - - java_version: 11 -is_push_event: false - fail-fast: false -name: ${{ matrix.test-target }} test on ${{ matrix.os }} with java ${{ matrix.java_version }} -runs-on: ${{ matrix.os }} -container: - image: amd64/rust -env: - JAVA_VERSION: ${{ matrix.java_version == 8 && '1.8' || format('{0}', matrix.java_version) }} -steps: - - uses: actions/checkout@v4 - - name: Setup Rust & Java toolchain -uses: ./.github/actions/setup-builder -with: - rust-version: ${{env.RUST_VERSION}} - jdk-version: ${{ matrix.java_version }} - - - uses: actions/checkout@v4 - - if: matrix.test-target == 'rust' -name: Rust test steps -uses: ./.github/actions/rust-test - - if: matrix.test-target == 'java' -name: Java test steps -uses: ./.github/actions/java-test - - macos-test: -strategy: - matrix: -os: [macos-13] -java_version: [8, 11, 17] -test-target: [rust, java] - fail-fast: false -if: github.event_name == 'push' -name: ${{ matrix.test-target }} test on ${{ matrix.os }} with java ${{ matrix.java_version }} -runs-on: ${{ matrix.os }} -env: - JAVA_VERSION: ${{ matrix.java_version == 8 && '1.8' || format('{0}', matrix.java_version) }} -steps: - - uses: actions/checkout@v4 - - name: Setup Rust & Java toolchain -uses: ./.github/actions/setup-macos-builder -with: - rust-version: ${{env.RUST_VERSION}} - jdk-version: ${{ matrix.java_version }} - - - uses: actions/checkout@v4 - - if: matrix.test-target == 'rust' -name: Rust test steps -uses: ./.github/actions/rust-test - - if: matrix.test-target == 'java' -name: Java test steps -uses: ./.github/actions/java-test - - macos-aarch64-test: -strategy: - matrix: -java_version: [8, 11, 17] -test-target: [rust, java] -is_push_event: - - ${{ github.event_name == 'push' }} -exclude: # exclude java 11 for pull_request event - - java_version: 11 -is_push_event: false - fail-fast: false -name: ${{ matrix.test-target }} test on macos-aarch64 with java ${{ matrix.java_version }} -runs-on: macos-14 -env: - JAVA_VERSION: ${{ matrix.java_version == 8 && '1.8' || format('{0}', matrix.java_version) }} -steps: - - uses: actions/checkout@v4 - - name: Setup Rust & Java toolchain -uses: ./.github/actions/setup-macos-builder -with: - rust-version: ${{env.RUST_VERSION}} - jdk-version: ${{ matrix.java_version }} - jdk-architecture: aarch64 - protoc-architecture: aarch_64 - - - uses: actions/checkout@v4 - - if: matrix.test-target == 'rust' -name: Rust test steps -uses: ./.github/actions/rust-test - - if: matrix.test-target == 'java' -name: Java test steps -uses: ./.github/actions/java-test - - tpcds-1g: -name: Run TPC-DS queries with SF=1 + prepare: +name: Build native lib and prepare TPC-DS data runs-on: ubuntu-latest container: image: amd64/rust @@ -147,13 +52,22 @@ jobs: with: rust-version: ${{env.RUST_VERSION}} jdk-version: 11 + - name:
(arrow-datafusion-comet) branch main updated: test: Reduce end-to-end test time (#109)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new e306fc6 test: Reduce end-to-end test time (#109) e306fc6 is described below commit e306fc656ec41f814050a85fdb24d607e2a64147 Author: Chao Sun AuthorDate: Wed Feb 28 16:21:21 2024 -0800 test: Reduce end-to-end test time (#109) --- .../apache/comet/exec/CometAggregateSuite.scala| 27 +- ...Suite.scala => CometColumnarShuffleSuite.scala} | 683 + .../comet/exec/CometNativeShuffleSuite.scala | 216 +++ 3 files changed, 381 insertions(+), 545 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index 04735b5..d64a3a3 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -537,23 +537,18 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf(CometConf.COMET_BATCH_SIZE.key -> batchSize.toString) { // Test all combinations of different aggregation & group-by types -(1 to 4).foreach { col => - (1 to 14).foreach { gCol => -withView("v") { - sql(s"CREATE TEMP VIEW v AS SELECT _g$gCol, _$col FROM tbl ORDER BY _$col") - checkSparkAnswer(s"SELECT _g$gCol, FIRST(_$col) FROM v GROUP BY _g$gCol") - checkSparkAnswer(s"SELECT _g$gCol, LAST(_$col) FROM v GROUP BY _g$gCol") -} -checkSparkAnswer(s"SELECT _g$gCol, SUM(_$col) FROM tbl GROUP BY _g$gCol") -checkSparkAnswer( - s"SELECT _g$gCol, SUM(DISTINCT _$col) FROM tbl GROUP BY _g$gCol") -checkSparkAnswer(s"SELECT _g$gCol, COUNT(_$col) FROM tbl GROUP BY _g$gCol") -checkSparkAnswer( - s"SELECT _g$gCol, COUNT(DISTINCT _$col) FROM tbl GROUP BY _g$gCol") -checkSparkAnswer( - s"SELECT _g$gCol, MIN(_$col), MAX(_$col) FROM tbl GROUP BY _g$gCol") -checkSparkAnswer(s"SELECT _g$gCol, AVG(_$col) FROM tbl GROUP BY _g$gCol") +(1 to 14).foreach { gCol => + withView("v") { +sql(s"CREATE TEMP VIEW v AS SELECT _g$gCol, _1, _2, _3, _4 " + + "FROM tbl ORDER BY _1, _2, _3, _4") +checkSparkAnswer(s"SELECT _g$gCol, FIRST(_1), FIRST(_2), FIRST(_3), " + + s"FIRST(_4), LAST(_1), LAST(_2), LAST(_3), LAST(_4) FROM v GROUP BY _g$gCol") } + checkSparkAnswer(s"SELECT _g$gCol, SUM(_1), SUM(_2), COUNT(_3), COUNT(_4), " + +s"MIN(_1), MAX(_4), AVG(_2), AVG(_4) FROM tbl GROUP BY _g$gCol") + checkSparkAnswer(s"SELECT _g$gCol, SUM(DISTINCT _3) FROM tbl GROUP BY _g$gCol") + checkSparkAnswer( +s"SELECT _g$gCol, COUNT(DISTINCT _1) FROM tbl GROUP BY _g$gCol") } } } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala similarity index 54% rename from spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala rename to spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala index 0d7c73d..fec6197 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala @@ -24,28 +24,23 @@ import org.scalatest.Tag import org.apache.hadoop.fs.Path import org.apache.spark.{Partitioner, SparkConf} -import org.apache.spark.sql.{CometTestBase, Row} +import org.apache.spark.sql.{CometTestBase, DataFrame, Row} import org.apache.spark.sql.comet.execution.shuffle.{CometShuffleDependency, CometShuffleExchangeExec, CometShuffleManager} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.comet.CometConf -import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus - -abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPlanHelper { +abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSparkPlan
(arrow-datafusion-comet) branch main updated: feat: Support CollectLimit operator (#100)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 313111d feat: Support CollectLimit operator (#100) 313111d is described below commit 313111d779645f63b5a075aeeef6b0b916c162ee Author: advancedxy AuthorDate: Thu Feb 29 02:19:08 2024 +0800 feat: Support CollectLimit operator (#100) --- .../apache/comet/CometSparkSessionExtensions.scala | 37 +++ .../org/apache/comet/serde/QueryPlanSerde.scala| 1 + .../shims/ShimCometSparkSessionExtensions.scala| 17 .../apache/spark/sql/comet/CometCoalesceExec.scala | 20 +--- .../spark/sql/comet/CometCollectLimitExec.scala| 112 + .../apache/spark/sql/comet/CometExecUtils.scala| 43 .../sql/comet/CometTakeOrderedAndProjectExec.scala | 6 +- .../org/apache/comet/exec/CometExecSuite.scala | 32 +- .../scala/org/apache/spark/sql/CometTestBase.scala | 19 9 files changed, 261 insertions(+), 26 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 10c3328..dae9f3f 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -65,6 +65,9 @@ class CometSparkSessionExtensions case class CometExecColumnar(session: SparkSession) extends ColumnarRule { override def preColumnarTransitions: Rule[SparkPlan] = CometExecRule(session) + +override def postColumnarTransitions: Rule[SparkPlan] = + EliminateRedundantColumnarToRow(session) } case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] { @@ -284,6 +287,20 @@ class CometSparkSessionExtensions op } +case op: CollectLimitExec +if isCometNative(op.child) && isCometOperatorEnabled(conf, "collectLimit") + && isCometShuffleEnabled(conf) + && getOffset(op) == 0 => + QueryPlanSerde.operator2Proto(op) match { +case Some(nativeOp) => + val offset = getOffset(op) + val cometOp = +CometCollectLimitExec(op, op.limit, offset, op.child) + CometSinkPlaceHolder(nativeOp, op, cometOp) +case None => + op + } + case op: ExpandExec => val newOp = transform1(op) newOp match { @@ -457,6 +474,26 @@ class CometSparkSessionExtensions } } } + + // CometExec already wraps a `ColumnarToRowExec` for row-based operators. Therefore, + // `ColumnarToRowExec` is redundant and can be eliminated. + // + // It was added during ApplyColumnarRulesAndInsertTransitions' insertTransitions phase when Spark + // requests row-based output such as `collect` call. It's correct to add a redundant + // `ColumnarToRowExec` for `CometExec`. However, for certain operators such as + // `CometCollectLimitExec` which overrides `executeCollect`, the redundant `ColumnarToRowExec` + // makes the override ineffective. The purpose of this rule is to eliminate the redundant + // `ColumnarToRowExec` for such operators. + case class EliminateRedundantColumnarToRow(session: SparkSession) extends Rule[SparkPlan] { +override def apply(plan: SparkPlan): SparkPlan = { + plan match { +case ColumnarToRowExec(child: CometCollectLimitExec) => + child +case other => + other + } +} + } } object CometSparkSessionExtensions extends Logging { diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index fcc0ca9..46eb1b0 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1835,6 +1835,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { case s if isCometScan(s) => true case _: CometSinkPlaceHolder => true case _: CoalesceExec => true + case _: CollectLimitExec => true case _: UnionExec => true case _: ShuffleExchangeExec => true case _: TakeOrderedAndProjectExec => true diff --git a/spark/src/main/scala/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala index 8afed84..85c6413 100644 --- a/spark/src/main/scala/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala @@ -20,9 +20,11 @@ package org.apache.comet.shims import
(arrow-datafusion-comet) branch main updated: doc: Add Quickstart Comet doc section (#125)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 27f167b doc: Add Quickstart Comet doc section (#125) 27f167b is described below commit 27f167bad4ac2b90457ecce49682ef2932726c3b Author: comphead AuthorDate: Tue Feb 27 16:22:54 2024 -0800 doc: Add Quickstart Comet doc section (#125) Co-authored-by: o_voievodin --- README.md | 46 ++ 1 file changed, 46 insertions(+) diff --git a/README.md b/README.md index f078609..f48dfd9 100644 --- a/README.md +++ b/README.md @@ -58,3 +58,49 @@ Linux, Apple OSX (Intel and M1) - Apache Spark 3.2, 3.3, or 3.4 - JDK 8 and up - GLIBC 2.17 (Centos 7) and up + +## Getting started + +Make sure the requirements above are met and software installed on your machine + +### Clone repo +```commandline +git clone https://github.com/apache/arrow-datafusion-comet.git +``` + +### Specify the Spark version and build the Comet +Spark 3.4 used for the example. +``` +cd arrow-datafusion-comet +make release PROFILES="-Pspark-3.4" +``` + +### Run Spark with Comet enabled +Make sure `SPARK_HOME` points to the same Spark version as Comet has built for. + +``` +$SPARK_HOME/bin/spark-shell --jars spark/target/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar \ +--conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \ +--conf spark.comet.enabled=true \ +--conf spark.comet.exec.enabled=true \ +--conf spark.comet.exec.all.enabled=true +``` + +### Verify Comet enabled for Spark SQL query + +Create a test Parquet source +```scala +scala> (0 until 10).toDF("a").write.mode("overwrite").parquet("/tmp/test") +``` + +Query the data from the test source and check: +- INFO message shows the native Comet library has been initialized. +- The query plan reflects Comet operators being used for this query instead of Spark ones +```scala +scala> spark.read.parquet("/tmp/test").createOrReplaceTempView("t1"); spark.sql("select * from t1 where a > 5").explain +INFO src/lib.rs: Comet native library initialized +== Physical Plan == +*(1) ColumnarToRow ++- CometFilter [a#14], (isnotnull(a#14) AND (a#14 > 5)) ++- CometScan parquet [a#14] Batched: true, DataFilters: [isnotnull(a#14), (a#14 > 5)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/test], PartitionFilters: [], PushedFilters: [IsNotNull(a), GreaterThan(a,5)], ReadSchema: struct +``` \ No newline at end of file
(arrow-datafusion-comet) branch main updated (6ec8cb9 -> 0a96145)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git from 6ec8cb9 fix: Fix compilation error for Spark 3.2 & 3.3 (#117) add 0a96145 test: Move MacOS (x86) pipelines to post-commit (#122) No new revisions were added by this update. Summary of changes: .github/workflows/pr_build.yml | 11 ++- 1 file changed, 6 insertions(+), 5 deletions(-)
(arrow-datafusion-comet) branch main updated: fix: Fix compilation error for Spark 3.2 & 3.3 (#117)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 6ec8cb9 fix: Fix compilation error for Spark 3.2 & 3.3 (#117) 6ec8cb9 is described below commit 6ec8cb912b938ef0ad7291f4f3de9ce7a883ae34 Author: Chao Sun AuthorDate: Mon Feb 26 21:55:14 2024 -0800 fix: Fix compilation error for Spark 3.2 & 3.3 (#117) --- spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 0414671..29b6e12 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -43,6 +43,7 @@ import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE import org.apache.spark.unsafe.types.UTF8String import org.apache.comet.CometConf +import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus class CometExecSuite extends CometTestBase { import testImplicits._ @@ -1055,6 +1056,7 @@ class CometExecSuite extends CometTestBase { } test("Fallback to Spark for TakeOrderedAndProjectExec with offset") { +assume(isSpark34Plus) Seq("true", "false").foreach(aqeEnabled => withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) { withTable("t1") { @@ -1066,7 +1068,7 @@ class CometExecSuite extends CometTestBase { .write .saveAsTable("t1") - val df = sql("SELECT * FROM t1 ORDER BY a, b LIMIT 3").offset(1).groupBy($"a").sum("b") + val df = sql("SELECT * FROM t1 ORDER BY a, b LIMIT 3 OFFSET 1").groupBy($"a").sum("b") checkSparkAnswer(df) } })
(arrow-datafusion-comet) branch main updated: feat: Add license header by spotless:apply automatically (#110)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new f359ed7 feat: Add license header by spotless:apply automatically (#110) f359ed7 is described below commit f359ed79a3dc710fadbc0484944f280c55c181e1 Author: advancedxy AuthorDate: Tue Feb 27 01:47:02 2024 +0800 feat: Add license header by spotless:apply automatically (#110) --- dev/copyright/java-header.txt | 19 dev/copyright/scala-header.txt | 1 + pom.xml| 6 + .../org/apache/spark/sql/comet/CometPlan.scala | 26 -- .../src/test/scala/org/apache/spark/sql/TPCH.scala | 10 - .../test/scala/org/apache/spark/sql/Tables.scala | 13 +-- .../spark/sql/comet/CometPlanStabilitySuite.scala | 1 + 7 files changed, 52 insertions(+), 24 deletions(-) diff --git a/dev/copyright/java-header.txt b/dev/copyright/java-header.txt new file mode 100644 index 000..bd244d0 --- /dev/null +++ b/dev/copyright/java-header.txt @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + diff --git a/dev/copyright/scala-header.txt b/dev/copyright/scala-header.txt new file mode 12 index 000..372bb40 --- /dev/null +++ b/dev/copyright/scala-header.txt @@ -0,0 +1 @@ +java-header.txt \ No newline at end of file diff --git a/pom.xml b/pom.xml index 5a96eae..d7cd076 100644 --- a/pom.xml +++ b/pom.xml @@ -723,6 +723,9 @@ under the License. java|javax,scala,org,org.apache,com,org.apache.comet,\#,\#org.apache.comet + + ${maven.multiModuleProjectDirectory}/dev/copyright/java-header.txt + @@ -730,6 +733,9 @@ under the License. 3.6.1 ${maven.multiModuleProjectDirectory}/scalafmt.conf + + ${maven.multiModuleProjectDirectory}/dev/copyright/scala-header.txt + diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometPlan.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometPlan.scala index fe2ce7e..e5d268c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometPlan.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometPlan.scala @@ -1,18 +1,20 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - *http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language
(arrow-datafusion-comet) branch main updated: build: Show time duration for scala test (#116)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new a2b1f9b build: Show time duration for scala test (#116) a2b1f9b is described below commit a2b1f9bea381eb61355a16cd509e9383a0bf7abc Author: advancedxy AuthorDate: Tue Feb 27 01:46:32 2024 +0800 build: Show time duration for scala test (#116) --- pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/pom.xml b/pom.xml index e46a49a..5a96eae 100644 --- a/pom.xml +++ b/pom.xml @@ -641,6 +641,7 @@ under the License. ${project.build.directory}/surefire-reports . SparkTestSuite.txt +D org.apache.comet.IntegrationTestSuite -ea -Xmx4g -Xss4m ${extraJavaTestArgs}
(arrow-datafusion-comet) branch main updated: fix: Another attempt to fix libcrypto.dylib loading issue (#112)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 7be5a18 fix: Another attempt to fix libcrypto.dylib loading issue (#112) 7be5a18 is described below commit 7be5a1806af6bf213c8cb7739c0263f1fb6ab8fa Author: advancedxy AuthorDate: Tue Feb 27 01:04:52 2024 +0800 fix: Another attempt to fix libcrypto.dylib loading issue (#112) --- .github/actions/setup-macos-builder/action.yaml | 9 + .github/workflows/pr_build.yml | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/.github/actions/setup-macos-builder/action.yaml b/.github/actions/setup-macos-builder/action.yaml index 63010ea..cc1b631 100644 --- a/.github/actions/setup-macos-builder/action.yaml +++ b/.github/actions/setup-macos-builder/action.yaml @@ -49,12 +49,13 @@ runs: unzip $PROTO_ZIP echo "$HOME/d/protoc/bin" >> $GITHUB_PATH export PATH=$PATH:$HOME/d/protoc/bin -# install openssl and setup DYLD_LIBRARY_PATH to work with libcrypto.dylib loading issues with x86_64 mac runners -# see PR https://github.com/apache/arrow-datafusion-comet/pull/55 for more details +# install openssl and setup DYLD_LIBRARY_PATH brew install openssl -OPENSSL_LIB_PATH=$(dirname `brew list openssl | grep 'lib/libcrypto.dylib'`) +OPENSSL_LIB_PATH=`brew --prefix openssl`/lib echo "openssl lib path is: ${OPENSSL_LIB_PATH}" -export DYLD_LIBRARY_PATH=$OPENSSL_LIB_PATH:$DYLD_LIBRARY_PATH +echo "DYLD_LIBRARY_PATH=$OPENSSL_LIB_PATH:$DYLD_LIBRARY_PATH" >> $GITHUB_ENV +# output the current status of SIP for later debugging +csrutil status || true - name: Install JDK ${{inputs.jdk-version}} uses: actions/setup-java@v4 diff --git a/.github/workflows/pr_build.yml b/.github/workflows/pr_build.yml index 669eddd..fe83032 100644 --- a/.github/workflows/pr_build.yml +++ b/.github/workflows/pr_build.yml @@ -70,7 +70,7 @@ jobs: macos-test: strategy: matrix: -os: [macos-latest] +os: [macos-13] java_version: [8, 11, 17] test-target: [rust, java] is_push_event:
(arrow-datafusion-comet) branch main updated: fix: Cast string to boolean not compatible with Spark (#107)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 96dfccf fix: Cast string to boolean not compatible with Spark (#107) 96dfccf is described below commit 96dfccf638470407c31b71aaada05d35836e9d93 Author: Eren Avsarogullari AuthorDate: Sun Feb 25 21:00:45 2024 -0800 fix: Cast string to boolean not compatible with Spark (#107) --- core/src/execution/datafusion/expressions/cast.rs | 40 +++--- .../org/apache/comet/CometExpressionSuite.scala| 24 + 2 files changed, 60 insertions(+), 4 deletions(-) diff --git a/core/src/execution/datafusion/expressions/cast.rs b/core/src/execution/datafusion/expressions/cast.rs index d845068..447c277 100644 --- a/core/src/execution/datafusion/expressions/cast.rs +++ b/core/src/execution/datafusion/expressions/cast.rs @@ -27,7 +27,7 @@ use arrow::{ record_batch::RecordBatch, util::display::FormatOptions, }; -use arrow_array::ArrayRef; +use arrow_array::{Array, ArrayRef, BooleanArray, GenericStringArray, OffsetSizeTrait}; use arrow_schema::{DataType, Schema}; use datafusion::logical_expr::ColumnarValue; use datafusion_common::{Result as DataFusionResult, ScalarValue}; @@ -73,10 +73,42 @@ impl Cast { } fn cast_array(, array: ArrayRef) -> DataFusionResult { -let array = array_with_timezone(array, self.timezone.clone(), Some(_type)); +let to_type = _type; +let array = array_with_timezone(array, self.timezone.clone(), Some(to_type)); let from_type = array.data_type(); -let cast_result = cast_with_options(, _type, _OPTIONS)?; -Ok(spark_cast(cast_result, from_type, _type)) +let cast_result = match (from_type, to_type) { +(DataType::Utf8, DataType::Boolean) => Self::spark_cast_utf8_to_boolean::(), +(DataType::LargeUtf8, DataType::Boolean) => { +Self::spark_cast_utf8_to_boolean::() +} +_ => cast_with_options(, to_type, _OPTIONS)?, +}; +let result = spark_cast(cast_result, from_type, to_type); +Ok(result) +} + +fn spark_cast_utf8_to_boolean(from: Array) -> ArrayRef +where +OffsetSize: OffsetSizeTrait, +{ +let array = from +.as_any() +.downcast_ref::>() +.unwrap(); + +let output_array = array +.iter() +.map(|value| match value { +Some(value) => match value.to_ascii_lowercase().trim() { +"t" | "true" | "y" | "yes" | "1" => Some(true), +"f" | "false" | "n" | "no" | "0" => Some(false), +_ => None, +}, +_ => None, +}) +.collect::(); + +Arc::new(output_array) } } diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 66ee275..3f29e95 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1302,4 +1302,28 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } } + + test("test cast utf8 to boolean as compatible with Spark") { +def testCastedColumn(inputValues: Seq[String]): Unit = { + val table = "test_table" + withTable(table) { +val values = inputValues.map(x => s"('$x')").mkString(",") +sql(s"create table $table(base_column char(20)) using parquet") +sql(s"insert into $table values $values") +checkSparkAnswerAndOperator( + s"select base_column, cast(base_column as boolean) as casted_column from $table") + } +} + +// Supported boolean values as true by both Arrow and Spark +testCastedColumn(inputValues = Seq("t", "true", "y", "yes", "1", "T", "TrUe", "Y", "YES")) +// Supported boolean values as false by both Arrow and Spark +testCastedColumn(inputValues = Seq("f", "false", "n", "no", "0", "F", "FaLSe", "N", "No")) +// Supported boolean values by Arrow but not Spark +testCastedColumn(inputValues = + Seq("TR", "FA", "tr", "tru", "ye", "on", "fa", "fal", "fals", "of", "off")) +// Invalid boolean casting values for Arrow and Spark +testCastedColumn(inputValues = Seq("car", "Truck")) + } + }
(arrow-datafusion-comet) branch main updated: feat: Support Binary in shuffle writer (#106)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 749731b feat: Support Binary in shuffle writer (#106) 749731b is described below commit 749731b38e98714983154a529625e53319744680 Author: advancedxy AuthorDate: Mon Feb 26 04:38:40 2024 +0800 feat: Support Binary in shuffle writer (#106) --- core/src/execution/datafusion/shuffle_writer.rs| 38 ++ .../org/apache/comet/exec/CometShuffleSuite.scala | 16 + 2 files changed, 54 insertions(+) diff --git a/core/src/execution/datafusion/shuffle_writer.rs b/core/src/execution/datafusion/shuffle_writer.rs index 2032ae6..fc15fac 100644 --- a/core/src/execution/datafusion/shuffle_writer.rs +++ b/core/src/execution/datafusion/shuffle_writer.rs @@ -315,6 +315,9 @@ fn slot_size(len: usize, data_type: ) -> usize { // TODO: this is not accurate, but should be good enough for now slot_size(len, key_type.as_ref()) + slot_size(len / 10, value_type.as_ref()) } +// TODO: this is not accurate, but should be good enough for now +DataType::Binary => len * 100 + len * 4, +DataType::LargeBinary => len * 100 + len * 8, DataType::FixedSizeBinary(s) => len * (*s as usize), DataType::Timestamp(_, _) => len * 8, dt => unimplemented!( @@ -521,6 +524,8 @@ fn append_columns( { append_string_dict!(key_type) } +DataType::Binary => append!(Binary), +DataType::LargeBinary => append!(LargeBinary), DataType::FixedSizeBinary(_) => append_unwrap!(FixedSizeBinary), t => unimplemented!( "{}", @@ -1275,3 +1280,36 @@ impl RecordBatchStream for EmptyStream { self.schema.clone() } } + +#[cfg(test)] +mod test { +use super::*; + +#[test] +fn test_slot_size() { +let batch_size = 1usize; +// not inclusive of all supported types, but enough to test the function +let supported_primitive_types = [ +DataType::Int32, +DataType::Int64, +DataType::UInt32, +DataType::UInt64, +DataType::Float32, +DataType::Float64, +DataType::Boolean, +DataType::Utf8, +DataType::LargeUtf8, +DataType::Binary, +DataType::LargeBinary, +DataType::FixedSizeBinary(16), +]; +let expected_slot_size = [4, 8, 4, 8, 4, 8, 1, 104, 108, 104, 108, 16]; +supported_primitive_types +.iter() +.zip(expected_slot_size.iter()) +.for_each(|(data_type, expected)| { +let slot_size = slot_size(batch_size, data_type); +assert_eq!(slot_size, *expected); +}) +} +} diff --git a/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala index beb6dc8..acd424a 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala @@ -1169,6 +1169,22 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla } } + test("fix: comet native shuffle with binary data") { +withSQLConf( + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { +val df = sql("SELECT cast(cast(_1 as STRING) as BINARY) as binary, _2 FROM tbl") + +val shuffled = df.repartition(1, $"binary") + +checkCometExchange(shuffled, 1, true) +checkSparkAnswer(shuffled) + } +} + } + test("Comet shuffle metrics") { withSQLConf( CometConf.COMET_EXEC_ENABLED.key -> "true",
(arrow-datafusion-comet) branch main updated: fix: Add num_rows when building RecordBatch (#103)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 5b6b546 fix: Add num_rows when building RecordBatch (#103) 5b6b546 is described below commit 5b6b5465cef9ed2b42667229d7ecd7397c44da6d Author: advancedxy AuthorDate: Sun Feb 25 00:23:45 2024 +0800 fix: Add num_rows when building RecordBatch (#103) --- core/src/execution/datafusion/shuffle_writer.rs | 7 +-- core/src/execution/shuffle/row.rs | 9 + spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala | 1 + 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/core/src/execution/datafusion/shuffle_writer.rs b/core/src/execution/datafusion/shuffle_writer.rs index 2e17dcf..2032ae6 100644 --- a/core/src/execution/datafusion/shuffle_writer.rs +++ b/core/src/execution/datafusion/shuffle_writer.rs @@ -272,10 +272,11 @@ impl PartitionBuffer { // active -> staging let active = std::mem::take( self.active); +let num_rows = self.num_active_rows; self.num_active_rows = 0; mem_diff -= self.active_slots_mem_size as isize; -let frozen_batch = make_batch(self.schema.clone(), active)?; +let frozen_batch = make_batch(self.schema.clone(), active, num_rows)?; let frozen_capacity_old = self.frozen.capacity(); let mut cursor = Cursor::new( self.frozen); @@ -1148,9 +1149,11 @@ fn make_dict_builder(datatype: , capacity: usize) -> Box>, +row_count: usize, ) -> ArrowResult { let columns = arrays.iter_mut().map(|array| array.finish()).collect(); -RecordBatch::try_new(schema, columns) +let options = RecordBatchOptions::new().with_row_count(Option::from(row_count)); +RecordBatch::try_new_with_options(schema, columns, ) } /// Checksum algorithms for writing IPC bytes. diff --git a/core/src/execution/shuffle/row.rs b/core/src/execution/shuffle/row.rs index e24fbbe..419ef9b 100644 --- a/core/src/execution/shuffle/row.rs +++ b/core/src/execution/shuffle/row.rs @@ -37,7 +37,7 @@ use arrow_array::{ StructBuilder, TimestampMicrosecondBuilder, }, types::Int32Type, -Array, ArrayRef, RecordBatch, +Array, ArrayRef, RecordBatch, RecordBatchOptions, }; use arrow_schema::{DataType, Field, Schema, TimeUnit}; use jni::sys::{jint, jlong}; @@ -3347,7 +3347,7 @@ pub fn process_sorted_row_partition( .zip(schema.iter()) .map(|(builder, datatype)| builder_to_array(builder, datatype, prefer_dictionary_ratio)) .collect(); -let batch = make_batch(array_refs?); +let batch = make_batch(array_refs?, n); let mut frozen: Vec = vec![]; let mut cursor = Cursor::new( frozen); @@ -3420,7 +3420,7 @@ fn builder_to_array( } } -fn make_batch(arrays: Vec) -> RecordBatch { +fn make_batch(arrays: Vec, row_count: usize) -> RecordBatch { let mut dict_id = 0; let fields = arrays .iter() @@ -3441,5 +3441,6 @@ fn make_batch(arrays: Vec) -> RecordBatch { }) .collect::>(); let schema = Arc::new(Schema::new(fields)); -RecordBatch::try_new(schema, arrays).unwrap() +let options = RecordBatchOptions::new().with_row_count(Option::from(row_count)); +RecordBatch::try_new_with_options(schema, arrays, ).unwrap() } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index eb5a8e9..0b94f0a 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -373,6 +373,7 @@ class CometExecSuite extends CometTestBase { withParquetDataFrame((0 until 5).map(i => (i, i + 1))) { df => assert(df.where("_1 IS NOT NULL").count() == 5) checkSparkAnswerAndOperator(df) + assert(df.select().limit(2).count() === 2) } }
(arrow-datafusion-comet) branch main updated: build: Add CI for TPCDS queries (#99)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 38b0bfd build: Add CI for TPCDS queries (#99) 38b0bfd is described below commit 38b0bfdd80b90a26919ee010778b4f3a3b58249c Author: Liang-Chi Hsieh AuthorDate: Sat Feb 24 08:23:29 2024 -0800 build: Add CI for TPCDS queries (#99) --- .github/workflows/pr_build.yml | 61 ++ pom.xml| 1 + .../apache/spark/sql/CometTPCDSQuerySuite.scala| 3 +- 3 files changed, 64 insertions(+), 1 deletion(-) diff --git a/.github/workflows/pr_build.yml b/.github/workflows/pr_build.yml index 55117d3..669eddd 100644 --- a/.github/workflows/pr_build.yml +++ b/.github/workflows/pr_build.yml @@ -131,3 +131,64 @@ jobs: - if: matrix.test-target == 'java' name: Java test steps uses: ./.github/actions/java-test + + tpcds-1g: +name: Run TPC-DS queries with SF=1 +runs-on: ubuntu-latest +container: + image: amd64/rust +env: + JAVA_VERSION: 11 +steps: + - uses: actions/checkout@v4 + - name: Setup Rust & Java toolchain +uses: ./.github/actions/setup-builder +with: + rust-version: ${{env.RUST_VERSION}} + jdk-version: 11 + + - name: Cache TPC-DS generated data +id: cache-tpcds-sf-1 +uses: actions/cache@v4 +with: + path: ./tpcds-sf-1 + key: tpcds-${{ hashFiles('.github/workflows/pr_build.yml') }} + - name: Checkout tpcds-kit repository +if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true' +uses: actions/checkout@v4 +with: + repository: databricks/tpcds-kit + path: ./tpcds-kit + - name: Build Comet +run: make release + - name: Build tpcds-kit +if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true' +run: | + apt-get install -y yacc bison flex + cd tpcds-kit/tools && make OS=LINUX + - name: Generate TPC-DS (SF=1) table data +if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true' +run: | + cd spark && MAVEN_OPTS='-Xmx20g' ../mvnw exec:java -Dexec.mainClass="org.apache.spark.sql.GenTPCDSData" -Dexec.classpathScope="test" -Dexec.cleanupDaemonThreads="false" -Dexec.args="--dsdgenDir `pwd`/../tpcds-kit/tools --location `pwd`/../tpcds-sf-1 --scaleFactor 1 --numPartitions 1" + cd .. + - name: Run TPC-DS queries (Sort merge join) +run: | + SPARK_HOME=`pwd` SPARK_TPCDS_DATA=`pwd`/tpcds-sf-1 ./mvnw -Dsuites=org.apache.spark.sql.CometTPCDSQuerySuite test +env: + SPARK_TPCDS_JOIN_CONF: | +spark.sql.autoBroadcastJoinThreshold=-1 +spark.sql.join.preferSortMergeJoin=true + - name: Run TPC-DS queries (Broadcast hash join) +run: | + SPARK_HOME=`pwd` SPARK_TPCDS_DATA=`pwd`/tpcds-sf-1 ./mvnw -Dsuites=org.apache.spark.sql.CometTPCDSQuerySuite test +env: + SPARK_TPCDS_JOIN_CONF: | +spark.sql.autoBroadcastJoinThreshold=10485760 + - name: Run TPC-DS queries (Shuffled hash join) +run: | + SPARK_HOME=`pwd` SPARK_TPCDS_DATA=`pwd`/tpcds-sf-1 ./mvnw -Dsuites=org.apache.spark.sql.CometTPCDSQuerySuite test +env: + SPARK_TPCDS_JOIN_CONF: | +spark.sql.autoBroadcastJoinThreshold=-1 +spark.sql.join.forceApplyShuffledHashJoin=true + diff --git a/pom.xml b/pom.xml index 507ec78..5cac3c8 100644 --- a/pom.xml +++ b/pom.xml @@ -835,6 +835,7 @@ under the License. **/test/resources/** **/benchmarks/*.txt **/inspections/*.txt +tpcds-kit/** diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala b/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala index 443c1cb..5d7b603 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala @@ -26,7 +26,8 @@ import org.apache.comet.CometConf class CometTPCDSQuerySuite extends { // This is private in `TPCDSBase`. - val excludedTpcdsQueries: Seq[String] = Seq("q34", "q64") + val excludedTpcdsQueries: Seq[String] = +Seq("q34", "q66", "q64", "q71", "q88", "q90", "q96") // This is private in `TPCDSBase` and `excludedTpcdsQueries` is private too. // So we cannot override `excludedTpcdsQueries` to exclude the queries.
(arrow-datafusion-comet) branch main updated: test: Enable TPCDS q41 in CometTPCDSQuerySuite (#98)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 51ec49b test: Enable TPCDS q41 in CometTPCDSQuerySuite (#98) 51ec49b is described below commit 51ec49bf9e4554de91ece9352ca73afcb9e51e28 Author: Liang-Chi Hsieh AuthorDate: Fri Feb 23 08:40:21 2024 -0800 test: Enable TPCDS q41 in CometTPCDSQuerySuite (#98) --- spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala b/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala index 92b4aa7..443c1cb 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala @@ -26,7 +26,7 @@ import org.apache.comet.CometConf class CometTPCDSQuerySuite extends { // This is private in `TPCDSBase`. - val excludedTpcdsQueries: Seq[String] = Seq("q34", "q41", "q64") + val excludedTpcdsQueries: Seq[String] = Seq("q34", "q64") // This is private in `TPCDSBase` and `excludedTpcdsQueries` is private too. // So we cannot override `excludedTpcdsQueries` to exclude the queries.
(arrow-datafusion-comet) branch main updated: feat: Support `First`/`Last` aggregate functions (#97)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new fbe7f80 feat: Support `First`/`Last` aggregate functions (#97) fbe7f80 is described below commit fbe7f80cc4045516822e8dfa8e89a944c757c2f2 Author: Huaxin Gao AuthorDate: Fri Feb 23 08:39:54 2024 -0800 feat: Support `First`/`Last` aggregate functions (#97) Co-authored-by: Huaxin Gao --- core/src/execution/datafusion/planner.rs | 25 +- core/src/execution/proto/expr.proto| 14 .../org/apache/comet/serde/QueryPlanSerde.scala| 38 - .../apache/comet/exec/CometAggregateSuite.scala| 95 ++ 4 files changed, 156 insertions(+), 16 deletions(-) diff --git a/core/src/execution/datafusion/planner.rs b/core/src/execution/datafusion/planner.rs index 2feaace..66a29cb 100644 --- a/core/src/execution/datafusion/planner.rs +++ b/core/src/execution/datafusion/planner.rs @@ -42,7 +42,8 @@ use datafusion_common::ScalarValue; use datafusion_physical_expr::{ execution_props::ExecutionProps, expressions::{ -CaseExpr, CastExpr, Count, InListExpr, IsNullExpr, Max, Min, NegativeExpr, NotExpr, Sum, +CaseExpr, CastExpr, Count, FirstValue, InListExpr, IsNullExpr, LastValue, Max, Min, +NegativeExpr, NotExpr, Sum, }, AggregateExpr, ScalarFunctionExpr, }; @@ -900,6 +901,28 @@ impl PhysicalPlanner { } } } +AggExprStruct::First(expr) => { +let child = self.create_expr(expr.child.as_ref().unwrap(), schema)?; +let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); +Ok(Arc::new(FirstValue::new( +child, +"first", +datatype, +vec![], +vec![], +))) +} +AggExprStruct::Last(expr) => { +let child = self.create_expr(expr.child.as_ref().unwrap(), schema)?; +let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); +Ok(Arc::new(LastValue::new( +child, +"last", +datatype, +vec![], +vec![], +))) +} } } diff --git a/core/src/execution/proto/expr.proto b/core/src/execution/proto/expr.proto index 53035b8..a80335c 100644 --- a/core/src/execution/proto/expr.proto +++ b/core/src/execution/proto/expr.proto @@ -85,6 +85,8 @@ message AggExpr { Min min = 4; Max max = 5; Avg avg = 6; +First first = 7; +Last last = 8; } } @@ -115,6 +117,18 @@ message Avg { bool fail_on_error = 4; // currently unused (useful for deciding Ansi vs Legacy mode) } +message First { + Expr child = 1; + DataType datatype = 2; + bool ignore_nulls = 3; +} + +message Last { + Expr child = 1; + DataType datatype = 2; + bool ignore_nulls = 3; +} + message Literal { oneof value { bool bool_val = 1; diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 938e49f..fcc0ca9 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average, Count, Final, Max, Min, Partial, Sum} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average, Count, Final, First, Last, Max, Min, Partial, Sum} import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, SinglePartition} @@ -287,6 +287,42 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { } else { None } + case first @ First(child, ignoreNulls) + if !ignoreNulls => // DataFusion doesn't support ignoreNulls true +val childExpr = exprToProto(child, inputs) +val dataType = serializeDataType(first.dataType) + +if (childExpr.isDefined && dataType.isDefined) { + val firstBuilder = ExprOuterClass.First.newBuilder() + firstBuilder.setChild(childExpr.get) + firstBuilder.setDatatype(dataType.get) + + Some( +ExprOuterClass.AggExpr + .newBuilder()
(arrow-datafusion-comet) branch main updated: test: Expose thrown exception when executing query in CometTPCHQuerySuite (#96)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 48741c0 test: Expose thrown exception when executing query in CometTPCHQuerySuite (#96) 48741c0 is described below commit 48741c0fc8842a4261217bab91fc6cf377234985 Author: Liang-Chi Hsieh AuthorDate: Fri Feb 23 08:37:29 2024 -0800 test: Expose thrown exception when executing query in CometTPCHQuerySuite (#96) --- .../test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala | 10 +- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala index 372a4cc..2264635 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala @@ -173,7 +173,15 @@ class CometTPCHQuerySuite extends QueryTest with CometTPCBase with SQLQueryTestH (segments(1).trim, segments(2).replaceAll("\\s+$", "")) } -assertResult(expectedSchema, s"Schema did not match\n$queryString") { +// Expose thrown exception when executing the query +val notMatchedSchemaOutput = if (schema == emptySchema) { + // There might be exception. See `handleExceptions`. + s"Schema did not match\n$queryString\nOutput/Exception: $outputString" +} else { + s"Schema did not match\n$queryString" +} + +assertResult(expectedSchema, notMatchedSchemaOutput) { schema } if (shouldSortResults) {
(arrow-datafusion-comet) branch main updated: fix: Avoid exception caused by broadcasting empty result (#92)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 995404c fix: Avoid exception caused by broadcasting empty result (#92) 995404c is described below commit 995404cf0f15702135a3cf47fc3ed7de6c54f52f Author: Zhen Wang <643348...@qq.com> AuthorDate: Sat Feb 24 00:37:12 2024 +0800 fix: Avoid exception caused by broadcasting empty result (#92) --- .../comet/execution/shuffle/CometShuffleManager.scala | 2 +- .../scala/org/apache/spark/sql/comet/operators.scala | 6 +- .../scala/org/apache/comet/exec/CometExecSuite.scala | 19 +++ 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala index 51e6df5..cb34225 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala @@ -52,7 +52,7 @@ class CometShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { " Shuffle will continue to spill to disk when necessary.") } - private val sortShuffleManager = new SortShuffleManager(conf); + private val sortShuffleManager = new SortShuffleManager(conf) /** * A mapping from shuffle ids to the task ids of mappers producing output for those shuffles. diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 07b8d5c..29e0cf2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -82,7 +82,11 @@ abstract class CometExec extends CometPlan { out.flush() out.close() - Iterator((count, cbbos.toChunkedByteBuffer)) + if (out.size() > 0) { +Iterator((count, cbbos.toChunkedByteBuffer)) + } else { +Iterator((count, new ChunkedByteBuffer(Array.empty[ByteBuffer]))) + } } } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index d3a1bd2..eb5a8e9 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -80,6 +80,25 @@ class CometExecSuite extends CometTestBase { } } + test("CometBroadcastExchangeExec: empty broadcast") { +withSQLConf(CometConf.COMET_EXEC_BROADCAST_ENABLED.key -> "true") { + withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl_a") { +withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl_b") { + val df = sql( +"SELECT /*+ BROADCAST(a) */ *" + + " FROM (SELECT * FROM tbl_a WHERE _1 < 0) a JOIN tbl_b b" + + " ON a._1 = b._1") + val nativeBroadcast = find(df.queryExecution.executedPlan) { +case _: CometBroadcastExchangeExec => true +case _ => false + }.get.asInstanceOf[CometBroadcastExchangeExec] + val rows = nativeBroadcast.executeCollect() + assert(rows.isEmpty) +} + } +} + } + test("CometExec.executeColumnarCollectIterator can collect ColumnarBatch results") { withSQLConf( CometConf.COMET_EXEC_ENABLED.key -> "true",
(arrow-datafusion-comet) branch main updated (0ed183a -> b6a1407)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git from 0ed183a feat: Support multiple input sources for CometNativeExec (#87) add b6a1407 feat: Date and timestamp trunc with format array (#94) No new revisions were added by this update. Summary of changes: .../execution/datafusion/expressions/temporal.rs | 28 +- core/src/execution/kernels/temporal.rs | 715 - .../org/apache/comet/CometExpressionSuite.scala| 39 ++ .../scala/org/apache/spark/sql/CometTestBase.scala | 75 +++ 4 files changed, 849 insertions(+), 8 deletions(-)
(arrow-datafusion-comet) branch main updated: feat: Reduce memory consumption when writing sorted shuffle files (#1490) (#82)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 71d0e59 feat: Reduce memory consumption when writing sorted shuffle files (#1490) (#82) 71d0e59 is described below commit 71d0e5936fccef99ad74e0c7750d35f115af0cf7 Author: Chao Sun AuthorDate: Thu Feb 22 11:21:15 2024 -0800 feat: Reduce memory consumption when writing sorted shuffle files (#1490) (#82) --- .../main/scala/org/apache/comet/CometConf.scala| 7 ++ core/benches/row_columnar.rs | 2 + core/src/execution/datafusion/shuffle_writer.rs| 79 +++- core/src/execution/jni_api.rs | 2 + core/src/execution/shuffle/row.rs | 134 +++-- .../sql/comet/execution/shuffle/SpillWriter.java | 3 + spark/src/main/scala/org/apache/comet/Native.scala | 4 + 7 files changed, 137 insertions(+), 94 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index c78d16b..1153b55 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -224,6 +224,13 @@ object CometConf { "Ensure that Comet shuffle memory overhead factor is a double greater than 0") .createWithDefault(1.0) + val COMET_COLUMNAR_SHUFFLE_BATCH_SIZE: ConfigEntry[Int] = +conf("spark.comet.columnar.shuffle.batch.size") + .internal() + .doc("Batch size when writing out sorted spill files on the native side.") + .intConf + .createWithDefault(8192) + val COMET_SHUFFLE_PREFER_DICTIONARY_RATIO: ConfigEntry[Double] = conf( "spark.comet.shuffle.preferDictionary.ratio") .doc("The ratio of total values to distinct values in a string column to decide whether to " + diff --git a/core/benches/row_columnar.rs b/core/benches/row_columnar.rs index 46f8233..60b4133 100644 --- a/core/benches/row_columnar.rs +++ b/core/benches/row_columnar.rs @@ -23,6 +23,7 @@ use criterion::{criterion_group, criterion_main, Criterion}; use tempfile::Builder; const NUM_ROWS: usize = 1; +const BATCH_SIZE: usize = 5000; const NUM_COLS: usize = 100; const ROW_SIZE: usize = SparkUnsafeRow::get_row_bitset_width(NUM_COLS) + NUM_COLS * 8; @@ -67,6 +68,7 @@ fn benchmark(c: Criterion) { process_sorted_row_partition( NUM_ROWS, +BATCH_SIZE, row_address_ptr, row_size_ptr, , diff --git a/core/src/execution/datafusion/shuffle_writer.rs b/core/src/execution/datafusion/shuffle_writer.rs index 7b7911f..2e17dcf 100644 --- a/core/src/execution/datafusion/shuffle_writer.rs +++ b/core/src/execution/datafusion/shuffle_writer.rs @@ -61,6 +61,7 @@ use tokio::task; use crate::{ common::bit::ceil, +errors::{CometError, CometResult}, execution::datafusion::spark_hash::{create_hashes, pmod}, }; @@ -1153,44 +1154,60 @@ fn make_batch( } /// Checksum algorithms for writing IPC bytes. -#[derive(Debug, Clone)] -pub(crate) enum ChecksumAlgorithm { +#[derive(Clone)] +pub(crate) enum Checksum { /// CRC32 checksum algorithm. -CRC32(Option), +CRC32(Hasher), /// Adler32 checksum algorithm. -Adler32(Option), +Adler32(Adler32), } -pub(crate) fn compute_checksum( -cursor: Cursor< Vec>, -checksum_algorithm: , -) -> Result { -match checksum_algorithm { -ChecksumAlgorithm::CRC32(checksum) => { -let mut hasher = if let Some(initial) = checksum { -Hasher::new_with_initial(*initial) -} else { -Hasher::new() -}; -std::io::Seek::seek(cursor, SeekFrom::Start(0))?; -hasher.update(cursor.chunk()); +impl Checksum { +pub(crate) fn try_new(algo: i32, initial_opt: Option) -> CometResult { +match algo { +0 => { +let hasher = if let Some(initial) = initial_opt { +Hasher::new_with_initial(initial) +} else { +Hasher::new() +}; +Ok(Checksum::CRC32(hasher)) +} +1 => { +let hasher = if let Some(initial) = initial_opt { +// Note that Adler32 initial state is not zero. +// i.e., `Adler32::from_checksum(0)` is not the same as `Adler32::new()`. +Adler32::from_checksum(initial) +} else { +Adler32::new() +}; +Ok(Checksum::Adler32(hasher)) +} +_ => Err(CometError::Internal( +"Unsupported checksu
(arrow-datafusion-comet) branch main updated: build: Support built with java 1.8 (#45)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 1acb56f build: Support built with java 1.8 (#45) 1acb56f is described below commit 1acb56fa9a6cb9e82bf5188166f30b4b54593f99 Author: advancedxy AuthorDate: Thu Feb 22 02:15:32 2024 +0800 build: Support built with java 1.8 (#45) Co-authored-by: Liang-Chi Hsieh --- .github/actions/java-test/action.yaml | 5 +- .github/actions/rust-test/action.yaml | 11 +- .github/actions/setup-builder/action.yaml | 4 +- .github/actions/setup-macos-builder/action.yaml| 4 +- .github/workflows/pr_build.yml | 118 ++--- common/pom.xml | 2 +- .../java/org/apache/comet/parquet/FileReader.java | 4 + pom.xml| 4 +- .../scala/org/apache/spark/sql/GenTPCHData.scala | 2 +- 9 files changed, 81 insertions(+), 73 deletions(-) diff --git a/.github/actions/java-test/action.yaml b/.github/actions/java-test/action.yaml index 6c3af79..9c7de9a 100644 --- a/.github/actions/java-test/action.yaml +++ b/.github/actions/java-test/action.yaml @@ -37,9 +37,10 @@ runs: - name: Run Maven compile shell: bash run: | -./mvnw -B compile test-compile scalafix:scalafix -Psemanticdb +echo "JAVA_VERSION=${JAVA_VERSION}" +./mvnw -B compile test-compile scalafix:scalafix -Psemanticdb -Djava.version=${JAVA_VERSION} - name: Run tests shell: bash run: | -SPARK_HOME=`pwd` ./mvnw -B clean install +SPARK_HOME=`pwd` ./mvnw -B clean install -Djava.version=${JAVA_VERSION} diff --git a/.github/actions/rust-test/action.yaml b/.github/actions/rust-test/action.yaml index b66b639..a013a7b 100644 --- a/.github/actions/rust-test/action.yaml +++ b/.github/actions/rust-test/action.yaml @@ -51,15 +51,20 @@ runs: shell: bash run: | cd common -../mvnw -B clean compile -DskipTests +../mvnw -B clean compile -DskipTests -Djava.version=${JAVA_VERSION} - name: Run Cargo test shell: bash run: | cd core # This is required to run some JNI related tests on the Rust side + JAVA_LD_LIBRARY_PATH=$JAVA_HOME/lib/server:$JAVA_HOME/lib:$JAVA_HOME/lib/jli +# special handing for java 1.8 for both linux and mac distributions +if [ $JAVA_VERSION == "8" ]; then + JAVA_LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/jli:$JAVA_HOME/jre/lib/server:$JAVA_HOME/jre/lib:$JAVA_HOME/jre/lib/jli +fi RUST_BACKTRACE=1 \ - LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$JAVA_HOME/lib:$JAVA_HOME/lib/server:$JAVA_HOME/lib/jli \ - DYLD_LIBRARY_PATH=$DYLD_LIBRARY_PATH:$JAVA_HOME/lib:$JAVA_HOME/lib/server:$JAVA_HOME/lib/jli \ +LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$JAVA_LD_LIBRARY_PATH \ +DYLD_LIBRARY_PATH=$DYLD_LIBRARY_PATH:$JAVA_LD_LIBRARY_PATH \ cargo test diff --git a/.github/actions/setup-builder/action.yaml b/.github/actions/setup-builder/action.yaml index f1aeb25..6fa7164 100644 --- a/.github/actions/setup-builder/action.yaml +++ b/.github/actions/setup-builder/action.yaml @@ -38,7 +38,9 @@ runs: - name: Install JDK ${{inputs.jdk-version}} uses: actions/setup-java@v4 with: -distribution: 'adopt' +# distribution is chosen to be zulu as it still offers JDK 8 with Silicon support, which +# is not available in the adopt distribution +distribution: 'zulu' java-version: ${{inputs.jdk-version}} - name: Set JAVA_HOME diff --git a/.github/actions/setup-macos-builder/action.yaml b/.github/actions/setup-macos-builder/action.yaml index 1f680ed..63010ea 100644 --- a/.github/actions/setup-macos-builder/action.yaml +++ b/.github/actions/setup-macos-builder/action.yaml @@ -59,7 +59,9 @@ runs: - name: Install JDK ${{inputs.jdk-version}} uses: actions/setup-java@v4 with: -distribution: 'adopt' +# distribution is chosen to be zulu as it still offers JDK 8 with Silicon support, which +# is not available in the adopt distribution +distribution: 'zulu' java-version: ${{inputs.jdk-version}} architecture: ${{inputs.jdk-architecture}} diff --git a/.github/workflows/pr_build.yml b/.github/workflows/pr_build.yml index 51962cd..55117d3 100644 --- a/.github/workflows/pr_build.yml +++ b/.github/workflows/pr_build.yml @@ -35,105 +35,99 @@ on: workflow_dispatch: env: - JAVA_VERSION: 17 RUST_VERSION: nightly jobs: - linux-rust-test: -name: Rust test (amd64) -runs-on: ubuntu-latest + linux-test: +strategy: + matrix:
(arrow-datafusion-comet) branch main updated: fix: Fix the UnionExec match branches in CometExecRule (#68)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 97ed2b8 fix: Fix the UnionExec match branches in CometExecRule (#68) 97ed2b8 is described below commit 97ed2b8a39a2fa30ba611aa8a64c68beaa995cc1 Author: wankun AuthorDate: Wed Feb 21 23:48:34 2024 +0800 fix: Fix the UnionExec match branches in CometExecRule (#68) --- spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index f4f56f0..489504d 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -327,6 +327,8 @@ class CometSparkSessionExtensions case Some(nativeOp) => val cometOp = CometUnionExec(u, u.children) CometSinkPlaceHolder(nativeOp, u, cometOp) +case None => + u } // Native shuffle for Comet operators
(arrow-datafusion-comet) branch main updated: feat: Add `CometNativeException` for exceptions thrown from the native side (#62)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 8246739 feat: Add `CometNativeException` for exceptions thrown from the native side (#62) 8246739 is described below commit 8246739a8dab0ae0b03bdc0d760c75c7c2de33c0 Author: Chao Sun AuthorDate: Tue Feb 20 22:42:05 2024 -0800 feat: Add `CometNativeException` for exceptions thrown from the native side (#62) --- .../org/apache/comet/CometNativeException.java | 27 ++ core/src/errors.rs | 2 +- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/common/src/main/java/org/apache/comet/CometNativeException.java b/common/src/main/java/org/apache/comet/CometNativeException.java new file mode 100644 index 000..6251475 --- /dev/null +++ b/common/src/main/java/org/apache/comet/CometNativeException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet; + +/** Parent class for all exceptions thrown from Comet native side. */ +public class CometNativeException extends CometRuntimeException { + public CometNativeException(String message) { +super(message); + } +} diff --git a/core/src/errors.rs b/core/src/errors.rs index 936d97d..0da2c9c 100644 --- a/core/src/errors.rs +++ b/core/src/errors.rs @@ -187,7 +187,7 @@ impl jni::errors::ToException for CometError { msg: self.to_string(), }, _other => Exception { -class: "org/apache/comet/CometRuntimeException".to_string(), +class: "org/apache/comet/CometNativeException".to_string(), msg: self.to_string(), }, }
(arrow-datafusion-comet) branch main updated: feat: Handle exception thrown from native side (#61)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 180f962 feat: Handle exception thrown from native side (#61) 180f962 is described below commit 180f962c17c4a3362e3a4b25e0d3462aba4cab21 Author: Chao Sun AuthorDate: Tue Feb 20 12:46:15 2024 -0800 feat: Handle exception thrown from native side (#61) This PR catches exceptions thrown from native side via calling Java methods, and convert them into a `CometError::JavaException` which can then be properly propagated to the JVM. --- core/src/errors.rs | 3 + .../execution/datafusion/expressions/subquery.rs | 26 ++-- core/src/jvm_bridge/mod.rs | 157 ++--- 3 files changed, 153 insertions(+), 33 deletions(-) diff --git a/core/src/errors.rs b/core/src/errors.rs index 7188ebd..936d97d 100644 --- a/core/src/errors.rs +++ b/core/src/errors.rs @@ -122,6 +122,9 @@ pub enum CometError { #[from] source: DataFusionError, }, + +#[error("{class}: {msg}")] +JavaException { class: String, msg: String }, } pub fn init() { diff --git a/core/src/execution/datafusion/expressions/subquery.rs b/core/src/execution/datafusion/expressions/subquery.rs index a4b32ba..7cae129 100644 --- a/core/src/execution/datafusion/expressions/subquery.rs +++ b/core/src/execution/datafusion/expressions/subquery.rs @@ -93,7 +93,7 @@ impl PhysicalExpr for Subquery { let mut env = JVMClasses::get_env(); unsafe { -let is_null = jni_static_call!(env, +let is_null = jni_static_call!( env, comet_exec.is_null(self.exec_context_id, self.id) -> jboolean )?; @@ -105,50 +105,50 @@ impl PhysicalExpr for Subquery { match _type { DataType::Boolean => { -let r = jni_static_call!(env, +let r = jni_static_call!( env, comet_exec.get_bool(self.exec_context_id, self.id) -> jboolean )?; Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(r > 0 } DataType::Int8 => { -let r = jni_static_call!(env, +let r = jni_static_call!( env, comet_exec.get_byte(self.exec_context_id, self.id) -> jbyte )?; Ok(ColumnarValue::Scalar(ScalarValue::Int8(Some(r } DataType::Int16 => { -let r = jni_static_call!(env, +let r = jni_static_call!( env, comet_exec.get_short(self.exec_context_id, self.id) -> jshort )?; Ok(ColumnarValue::Scalar(ScalarValue::Int16(Some(r } DataType::Int32 => { -let r = jni_static_call!(env, +let r = jni_static_call!( env, comet_exec.get_int(self.exec_context_id, self.id) -> jint )?; Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some(r } DataType::Int64 => { -let r = jni_static_call!(env, +let r = jni_static_call!( env, comet_exec.get_long(self.exec_context_id, self.id) -> jlong )?; Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(r } DataType::Float32 => { -let r = jni_static_call!(env, +let r = jni_static_call!( env, comet_exec.get_float(self.exec_context_id, self.id) -> f32 )?; Ok(ColumnarValue::Scalar(ScalarValue::Float32(Some(r } DataType::Float64 => { -let r = jni_static_call!(env, +let r = jni_static_call!( env, comet_exec.get_double(self.exec_context_id, self.id) -> f64 )?; Ok(ColumnarValue::Scalar(ScalarValue::Float64(Some(r } DataType::Decimal128(p, s) => { -let bytes = jni_static_call!(env, +let bytes = jni_static_call!( env, comet_exec.get_decimal(self.exec_context_id, self.id) -> BinaryWrapper )?; let bytes: = bytes.get().into(); @@ -161,14 +161,14 @@ impl PhysicalExpr for Subquery { ))) } DataType::Date32 => { -
(arrow-datafusion-comet) branch main updated: feat: Upgrade to `jni-rs` 0.21 (#50)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 7018225 feat: Upgrade to `jni-rs` 0.21 (#50) 7018225 is described below commit 701822599e6e14dd49d3dda481691ea5a38e498f Author: Chao Sun AuthorDate: Tue Feb 20 10:30:42 2024 -0800 feat: Upgrade to `jni-rs` 0.21 (#50) --- core/Cargo.lock| 104 +- core/Cargo.toml| 4 +- core/src/errors.rs | 169 .../execution/datafusion/expressions/subquery.rs | 204 +-- core/src/execution/jni_api.rs | 219 +++-- core/src/execution/metrics/utils.rs| 32 +-- core/src/jvm_bridge/comet_exec.rs | 102 +- core/src/jvm_bridge/comet_metric_node.rs | 22 +-- core/src/jvm_bridge/mod.rs | 24 +-- core/src/lib.rs| 8 +- core/src/parquet/mod.rs| 168 +--- core/src/parquet/util/jni.rs | 28 +-- core/src/parquet/util/mod.rs | 2 - 13 files changed, 602 insertions(+), 484 deletions(-) diff --git a/core/Cargo.lock b/core/Cargo.lock index 0585d7e..9c40b91 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -1087,7 +1087,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index; checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -1336,7 +1336,7 @@ version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index; checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5" dependencies = [ - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -1436,7 +1436,7 @@ checksum = "0bad00257d07be169d870ab665980b06cdb366d792ad690bf2e76876dc503455" dependencies = [ "hermit-abi", "rustix", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -1472,18 +1472,32 @@ version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index; checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" +[[package]] +name = "java-locator" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index; +checksum = "90003f2fd9c52f212c21d8520f1128da0080bad6fff16b68fe6e7f2f0c3780c2" +dependencies = [ + "glob", + "lazy_static", +] + [[package]] name = "jni" -version = "0.19.0" +version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index; -checksum = "c6df18c2e3db7e453d3c6ac5b3e9d5182664d28788126d39b91f2d1e22b017ec" +checksum = "1a87aa2bb7d2af34197c04845522473242e1aa17c12f4935d5856491a7fb8c97" dependencies = [ "cesu8", + "cfg-if", "combine", + "java-locator", "jni-sys", + "libloading", "log", "thiserror", "walkdir", + "windows-sys 0.45.0", ] [[package]] @@ -1586,6 +1600,16 @@ version = "0.2.151" source = "registry+https://github.com/rust-lang/crates.io-index; checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" +[[package]] +name = "libloading" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index; +checksum = "b67380fd3b2fbe7527a606e18729d21c6f3951633d0500574c4dc22d2d638b9f" +dependencies = [ + "cfg-if", + "winapi", +] + [[package]] name = "libm" version = "0.2.8" @@ -2319,7 +2343,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -2602,7 +2626,7 @@ dependencies = [ "fastrand", "redox_syscall", "rustix", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -3009,6 +3033,15 @@ dependencies = [ "windows-targets 0.52.0", ] +[[package]] +name = "windows-sys" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index; +checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" +dependencies = [ + "windows-targets 0.42.2", +] + [[package]] name = "windows-sys" version = "0.52.0" @@ -3018,6 +3051,21 @@
(arrow-datafusion-comet) branch main updated: build: Fix mvn cache for containerized runners (#48)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new a07cce3 build: Fix mvn cache for containerized runners (#48) a07cce3 is described below commit a07cce3335b9718f0f00d786532ab1d47d6f4749 Author: advancedxy AuthorDate: Wed Feb 21 02:03:51 2024 +0800 build: Fix mvn cache for containerized runners (#48) --- .github/actions/java-test/action.yaml | 12 +++- .github/actions/rust-test/action.yaml | 10 ++ 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/.github/actions/java-test/action.yaml b/.github/actions/java-test/action.yaml index 1b4075a..6c3af79 100644 --- a/.github/actions/java-test/action.yaml +++ b/.github/actions/java-test/action.yaml @@ -27,17 +27,19 @@ runs: - name: Cache Maven dependencies uses: actions/cache@v4 with: -path: ~/.m2/repository -key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} +path: | + ~/.m2/repository + /root/.m2/repository +key: ${{ runner.os }}-java-maven-${{ hashFiles('**/pom.xml') }} restore-keys: | - ${{ runner.os }}-maven- + ${{ runner.os }}-java-maven- - name: Run Maven compile shell: bash run: | -./mvnw compile test-compile scalafix:scalafix -Psemanticdb +./mvnw -B compile test-compile scalafix:scalafix -Psemanticdb - name: Run tests shell: bash run: | -SPARK_HOME=`pwd` ./mvnw clean install +SPARK_HOME=`pwd` ./mvnw -B clean install diff --git a/.github/actions/rust-test/action.yaml b/.github/actions/rust-test/action.yaml index 2b4ec36..b66b639 100644 --- a/.github/actions/rust-test/action.yaml +++ b/.github/actions/rust-test/action.yaml @@ -40,16 +40,18 @@ runs: - name: Cache Maven dependencies uses: actions/cache@v4 with: -path: ~/.m2/repository -key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} +path: | + ~/.m2/repository + /root/.m2/repository +key: ${{ runner.os }}-rust-maven-${{ hashFiles('**/pom.xml') }} restore-keys: | - ${{ runner.os }}-maven- + ${{ runner.os }}-rust-maven- - name: Build common module (pre-requisite for Rust tests) shell: bash run: | cd common -../mvnw clean compile -DskipTests +../mvnw -B clean compile -DskipTests - name: Run Cargo test shell: bash
(arrow-datafusion-comet) branch main updated: feat: Support Count(Distinct) and similar aggregation functions (#42)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 2820327 feat: Support Count(Distinct) and similar aggregation functions (#42) 2820327 is described below commit 2820327db4a7324067497110c2a655997b39f4f0 Author: Huaxin Gao AuthorDate: Tue Feb 20 09:01:34 2024 -0800 feat: Support Count(Distinct) and similar aggregation functions (#42) Co-authored-by: Huaxin Gao --- .../apache/comet/CometSparkSessionExtensions.scala | 8 +- .../org/apache/comet/serde/QueryPlanSerde.scala| 123 ++--- .../org/apache/spark/sql/comet/operators.scala | 2 +- .../apache/comet/exec/CometAggregateSuite.scala| 67 ++- 4 files changed, 153 insertions(+), 47 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 69d1fb3..f4f56f0 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -292,14 +292,18 @@ class CometSparkSessionExtensions newOp match { case Some(nativeOp) => val modes = aggExprs.map(_.mode).distinct - assert(modes.length == 1) + // The aggExprs could be empty. For example, if the aggregate functions only have + // distinct aggregate functions or only have group by, the aggExprs is empty and + // modes is empty too. If aggExprs is not empty, we need to verify all the aggregates + // have the same mode. + assert(modes.length == 1 || modes.length == 0) CometHashAggregateExec( nativeOp, op, groupingExprs, aggExprs, child.output, -modes.head, +if (modes.nonEmpty) Some(modes.head) else None, child) case None => op diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index f178a2f..15a26a0 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, SinglePartition} import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils -import org.apache.spark.sql.comet.{CometHashAggregateExec, CometSinkPlaceHolder, DecimalPrecision} +import org.apache.spark.sql.comet.{CometHashAggregateExec, CometPlan, CometSinkPlaceHolder, DecimalPrecision} import org.apache.spark.sql.execution import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.aggregate.HashAggregateExec @@ -1653,60 +1653,97 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { _, groupingExpressions, aggregateExpressions, -_, +aggregateAttributes, _, resultExpressions, child) if isCometOperatorEnabled(op.conf, "aggregate") => -val modes = aggregateExpressions.map(_.mode).distinct - -if (modes.size != 1) { - // This shouldn't happen as all aggregation expressions should share the same mode. - // Fallback to Spark nevertheless here. +if (groupingExpressions.isEmpty && aggregateExpressions.isEmpty) { return None } -val mode = modes.head match { - case Partial => CometAggregateMode.Partial - case Final => CometAggregateMode.Final - case _ => return None -} - -val output = mode match { - case CometAggregateMode.Partial => child.output - case CometAggregateMode.Final => -// Assuming `Final` always follows `Partial` aggregation, this find the first -// `Partial` aggregation and get the input attributes from it. -child.collectFirst { case CometHashAggregateExec(_, _, _, _, input, Partial, _) => - input -} match { - case Some(input) => input - case _ => return None -} - case _ => return None -} - -val aggExprs = aggregateExpressions.map(aggExprToProto(_, output)) val groupingExprs = groupingExpressions.map(exprToProto(_, child.output)) -if (childOp.nonEmpty && groupingExprs.forall(_.isDefined) &am
(arrow-datafusion-comet) branch main updated: build: Fix potential libcrypto lib loading issue for X86 mac runners (#55)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new f7b88e9 build: Fix potential libcrypto lib loading issue for X86 mac runners (#55) f7b88e9 is described below commit f7b88e9db5ac71ba6151888dd14881cdf0750eb0 Author: advancedxy AuthorDate: Wed Feb 21 01:01:13 2024 +0800 build: Fix potential libcrypto lib loading issue for X86 mac runners (#55) --- .github/actions/setup-macos-builder/action.yaml | 6 ++ 1 file changed, 6 insertions(+) diff --git a/.github/actions/setup-macos-builder/action.yaml b/.github/actions/setup-macos-builder/action.yaml index d69e9fb..1f680ed 100644 --- a/.github/actions/setup-macos-builder/action.yaml +++ b/.github/actions/setup-macos-builder/action.yaml @@ -49,6 +49,12 @@ runs: unzip $PROTO_ZIP echo "$HOME/d/protoc/bin" >> $GITHUB_PATH export PATH=$PATH:$HOME/d/protoc/bin +# install openssl and setup DYLD_LIBRARY_PATH to work with libcrypto.dylib loading issues with x86_64 mac runners +# see PR https://github.com/apache/arrow-datafusion-comet/pull/55 for more details +brew install openssl +OPENSSL_LIB_PATH=$(dirname `brew list openssl | grep 'lib/libcrypto.dylib'`) +echo "openssl lib path is: ${OPENSSL_LIB_PATH}" +export DYLD_LIBRARY_PATH=$OPENSSL_LIB_PATH:$DYLD_LIBRARY_PATH - name: Install JDK ${{inputs.jdk-version}} uses: actions/setup-java@v4
(arrow-datafusion-comet) branch main updated: refactor: Remove a few duplicated occurrences (#53)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 7e206e2 refactor: Remove a few duplicated occurrences (#53) 7e206e2 is described below commit 7e206e2019f31052743cb5d0809dc6983e5897b5 Author: Chao Sun AuthorDate: Tue Feb 20 09:01:55 2024 -0800 refactor: Remove a few duplicated occurrences (#53) --- .../scala/org/apache/comet/CometExecIterator.scala | 49 +++--- .../org/apache/spark/sql/comet/operators.scala | 44 --- 2 files changed, 41 insertions(+), 52 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index 029be29..0140582 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -19,12 +19,11 @@ package org.apache.comet -import java.util.HashMap - import org.apache.spark._ import org.apache.spark.sql.comet.CometMetricNode import org.apache.spark.sql.vectorized._ +import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION} import org.apache.comet.vector.NativeUtil /** @@ -45,36 +44,31 @@ class CometExecIterator( val id: Long, inputs: Seq[Iterator[ColumnarBatch]], protobufQueryPlan: Array[Byte], -configs: HashMap[String, String], nativeMetrics: CometMetricNode) extends Iterator[ColumnarBatch] { private val nativeLib = new Native() - private val plan = nativeLib.createPlan(id, configs, protobufQueryPlan, nativeMetrics) + private val plan = { +val configs = createNativeConf +nativeLib.createPlan(id, configs, protobufQueryPlan, nativeMetrics) + } private val nativeUtil = new NativeUtil private var nextBatch: Option[ColumnarBatch] = None private var currentBatch: ColumnarBatch = null private var closed: Boolean = false private def peekNext(): ExecutionState = { -val result = nativeLib.peekNext(plan) -val flag = result(0) - -if (flag == 0) Pending -else if (flag == 1) { - val numRows = result(1) - val addresses = result.slice(2, result.length) - Batch(numRows = numRows.toInt, addresses = addresses) -} else { - throw new IllegalStateException(s"Invalid native flag: $flag") -} +convertNativeResult(nativeLib.peekNext(plan)) } private def executeNative( input: Array[Array[Long]], finishes: Array[Boolean], numRows: Int): ExecutionState = { -val result = nativeLib.executePlan(plan, input, finishes, numRows) +convertNativeResult(nativeLib.executePlan(plan, input, finishes, numRows)) + } + + private def convertNativeResult(result: Array[Long]): ExecutionState = { val flag = result(0) if (flag == -1) EOF else if (flag == 0) Pending @@ -87,6 +81,29 @@ class CometExecIterator( } } + /** + * Creates a new configuration map to be passed to the native side. + */ + private def createNativeConf: java.util.HashMap[String, String] = { +val result = new java.util.HashMap[String, String]() +val conf = SparkEnv.get.conf + +val maxMemory = CometSparkSessionExtensions.getCometMemoryOverhead(conf) +result.put("memory_limit", String.valueOf(maxMemory)) +result.put("memory_fraction", String.valueOf(COMET_EXEC_MEMORY_FRACTION.get())) +result.put("batch_size", String.valueOf(COMET_BATCH_SIZE.get())) +result.put("debug_native", String.valueOf(COMET_DEBUG_ENABLED.get())) + +// Strip mandatory prefix spark. which is not required for DataFusion session params +conf.getAll.foreach { + case (k, v) if k.startsWith("spark.datafusion") => +result.put(k.replaceFirst("spark\\.", ""), v) + case _ => +} + +result + } + /** Execution result from Comet native */ trait ExecutionState diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 7ac1084..4d8011e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -23,7 +23,7 @@ import java.io.ByteArrayOutputStream import scala.collection.mutable.ArrayBuffer -import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, NamedExpression, SortOrder} @@ -31,12 +31,12 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, import org.apache.spark.sql.catalyst
(arrow-datafusion-comet) branch main updated: test: Reduce test time spent in `CometShuffleSuite` (#40)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new fb88650 test: Reduce test time spent in `CometShuffleSuite` (#40) fb88650 is described below commit fb88650aecc0c8593c0d640c3c6923f00861 Author: Chao Sun AuthorDate: Mon Feb 19 21:32:38 2024 -0800 test: Reduce test time spent in `CometShuffleSuite` (#40) --- .../org/apache/comet/exec/CometShuffleSuite.scala | 68 +++--- 1 file changed, 33 insertions(+), 35 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala index db78bc2..dc47482 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala @@ -41,14 +41,11 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla protected val numElementsForceSpillThreshold: Int = 10 - protected val encryptionEnabled: Boolean = false - override protected def sparkConf: SparkConf = { val conf = super.sparkConf conf .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, adaptiveExecutionEnabled.toString) .set("spark.shuffle.unsafe.fastMergeEnabled", fastMergeEnabled.toString) - .set("spark.io.encryption.enabled", encryptionEnabled.toString) } protected val asyncShuffleEnable: Boolean @@ -780,40 +777,41 @@ class DisableAQECometAsyncShuffleSuite extends CometShuffleSuiteBase { protected val adaptiveExecutionEnabled: Boolean = false } -/** - * This suite tests the Comet shuffle encryption. Because the encryption configuration can only be - * set in SparkConf at the beginning, we need to create a separate suite for encryption. - */ -class CometShuffleEncryptionSuite extends CometShuffleSuiteBase { - override protected val adaptiveExecutionEnabled: Boolean = true - - override protected val asyncShuffleEnable: Boolean = false - - override protected val encryptionEnabled: Boolean = true -} - -class CometAsyncShuffleEncryptionSuite extends CometShuffleSuiteBase { - override protected val adaptiveExecutionEnabled: Boolean = true - - override protected val asyncShuffleEnable: Boolean = true - - override protected val encryptionEnabled: Boolean = true -} - -class DisableAQECometShuffleEncryptionSuite extends CometShuffleSuiteBase { - override protected val adaptiveExecutionEnabled: Boolean = false - - override protected val asyncShuffleEnable: Boolean = false - - override protected val encryptionEnabled: Boolean = true -} - -class DisableAQECometAsyncShuffleEncryptionSuite extends CometShuffleSuiteBase { - override protected val adaptiveExecutionEnabled: Boolean = false +class CometShuffleEncryptionSuite extends CometTestBase { + import testImplicits._ - override protected val asyncShuffleEnable: Boolean = true + override protected def sparkConf: SparkConf = { +val conf = super.sparkConf +conf.set("spark.io.encryption.enabled", "true") + } - override protected val encryptionEnabled: Boolean = true + test("comet columnar shuffle with encryption") { +Seq(10, 201).foreach { numPartitions => + Seq(true, false).foreach { dictionaryEnabled => +Seq(true, false).foreach { asyncEnabled => + withTempDir { dir => +val path = new Path(dir.toURI.toString, "test.parquet") +makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 1000) + +(1 until 10).map(i => $"_$i").foreach { col => + withSQLConf( +CometConf.COMET_EXEC_ENABLED.key -> "false", +CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", +CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", +CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> asyncEnabled.toString) { +readParquetFile(path.toString) { df => + val shuffled = df +.select($"_1") +.repartition(numPartitions, col) + checkSparkAnswer(shuffled) +} + } +} + } +} + } +} + } } class CometShuffleManagerSuite extends CometTestBase {
(arrow-datafusion-comet) branch main updated: build: Use `macos-latest` as runner image for MacOS (x86_64) (#41)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new db79910 build: Use `macos-latest` as runner image for MacOS (x86_64) (#41) db79910 is described below commit db79910d4844528542c74e1ed8cacefe214597f3 Author: Chao Sun AuthorDate: Mon Feb 19 12:38:16 2024 -0800 build: Use `macos-latest` as runner image for MacOS (x86_64) (#41) --- .github/workflows/pr_build.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pr_build.yml b/.github/workflows/pr_build.yml index ea8e509..51962cd 100644 --- a/.github/workflows/pr_build.yml +++ b/.github/workflows/pr_build.yml @@ -76,7 +76,7 @@ jobs: macos-rust-test: name: Rust test (macos) -runs-on: macos-13 +runs-on: macos-latest steps: - uses: actions/checkout@v4 - name: Setup Rust & Java toolchain @@ -91,7 +91,7 @@ jobs: macos-java-test: name: Java test (macos) -runs-on: macos-13 +runs-on: macos-latest steps: - uses: actions/checkout@v4 - name: Setup Rust & Java toolchain
(arrow-datafusion-comet) branch main updated: test: Add some fuzz testing for cast operations (#16)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 4e19751 test: Add some fuzz testing for cast operations (#16) 4e19751 is described below commit 4e197511569685916b5f55637878f4e60068bb58 Author: Andy Grove AuthorDate: Fri Feb 16 13:01:59 2024 -0700 test: Add some fuzz testing for cast operations (#16) --- .../scala/org/apache/comet/CometCastSuite.scala| 113 + 1 file changed, 113 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala new file mode 100644 index 000..565d226 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet + +import java.io.File + +import scala.util.Random + +import org.apache.spark.sql.{CometTestBase, DataFrame, SaveMode} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types.{DataType, DataTypes} + +class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { + import testImplicits._ + + ignore("cast long to short") { +castTest(generateLongs, DataTypes.ShortType) + } + + test("cast float to bool") { +castTest(generateFloats, DataTypes.BooleanType) + } + + test("cast float to int") { +castTest(generateFloats, DataTypes.IntegerType) + } + + ignore("cast float to string") { +castTest(generateFloats, DataTypes.StringType) + } + + ignore("cast string to bool") { +castTest( + Seq("TRUE", "True", "true", "FALSE", "False", "false", "1", "0", "").toDF("a"), + DataTypes.BooleanType) +fuzzCastFromString("truefalseTRUEFALSEyesno10 \t\r\n", 8, DataTypes.BooleanType) + } + + ignore("cast string to short") { +fuzzCastFromString("0123456789e+- \t\r\n", 8, DataTypes.ShortType) + } + + ignore("cast string to float") { +fuzzCastFromString("0123456789e+- \t\r\n", 8, DataTypes.FloatType) + } + + ignore("cast string to double") { +fuzzCastFromString("0123456789e+- \t\r\n", 8, DataTypes.DoubleType) + } + + ignore("cast string to date") { +fuzzCastFromString("0123456789/ \t\r\n", 16, DataTypes.DateType) + } + + ignore("cast string to timestamp") { +castTest(Seq("2020-01-01T12:34:56.123456", "T2").toDF("a"), DataTypes.TimestampType) +fuzzCastFromString("0123456789/:T \t\r\n", 32, DataTypes.TimestampType) + } + + private def generateFloats = { +val r = new Random(0) +Range(0, 1).map(_ => r.nextFloat()).toDF("a") + } + + private def generateLongs = { +val r = new Random(0) +Range(0, 1).map(_ => r.nextLong()).toDF("a") + } + + private def genString(r: Random, chars: String, maxLen: Int): String = { +val len = r.nextInt(maxLen) +Range(0, len).map(_ => chars.charAt(r.nextInt(chars.length))).mkString + } + + private def fuzzCastFromString(chars: String, maxLen: Int, toType: DataType) { +val r = new Random(0) +val inputs = Range(0, 1).map(_ => genString(r, chars, maxLen)) +castTest(inputs.toDF("a"), toType) + } + + private def castTest(input: DataFrame, toType: DataType) { +withTempPath { dir => + val df = roundtripParquet(input, dir) +.withColumn("converted", col("a").cast(toType)) + checkSparkAnswer(df) +} + } + + private def roundtripParquet(df: DataFrame, tempDir: File): DataFrame = { +val filename = new File(tempDir, s"castTest_${System.currentTimeMillis()}.parquet").toString +df.write.mode(SaveMode.Overwrite).parquet(filename) +spark.read.parquet(filename) + } + +}
(arrow-datafusion-comet) branch main updated: build: Add CI for MacOS (x64 and aarch64) (#35)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 0d145ce build: Add CI for MacOS (x64 and aarch64) (#35) 0d145ce is described below commit 0d145ceb09a0e509ad32a70a659ef2768f899acd Author: Chao Sun AuthorDate: Fri Feb 16 09:50:42 2024 -0800 build: Add CI for MacOS (x64 and aarch64) (#35) --- .../{setup-builder => java-test}/action.yaml | 39 +++--- .github/actions/rust-test/action.yaml | 63 ++ .github/actions/setup-builder/action.yaml | 13 +- .../action.yaml| 33 +- .github/workflows/pr_build.yml | 131 +++-- 5 files changed, 195 insertions(+), 84 deletions(-) diff --git a/.github/actions/setup-builder/action.yaml b/.github/actions/java-test/action.yaml similarity index 54% copy from .github/actions/setup-builder/action.yaml copy to .github/actions/java-test/action.yaml index a4cd393..1b4075a 100644 --- a/.github/actions/setup-builder/action.yaml +++ b/.github/actions/java-test/action.yaml @@ -15,30 +15,29 @@ # specific language governing permissions and limitations # under the License. -name: Prepare Builder -description: 'Prepare Build Environment' -inputs: - rust-version: -description: 'version of rust to install (e.g. nightly)' -required: true -default: 'nightly' - jdk-version: -description: 'jdk version to install (e.g., 17)' -required: true -default: '17' runs: using: "composite" steps: -- name: Install Build Dependencies +- name: Run Cargo build shell: bash run: | -apt-get update -apt-get install -y openjdk-${{inputs.jdk-version}}-jdk protobuf-compiler -- name: Setup Rust toolchain +cd core +cargo build + +- name: Cache Maven dependencies + uses: actions/cache@v4 + with: +path: ~/.m2/repository +key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} +restore-keys: | + ${{ runner.os }}-maven- + +- name: Run Maven compile + shell: bash + run: | +./mvnw compile test-compile scalafix:scalafix -Psemanticdb + +- name: Run tests shell: bash - # rustfmt is needed for the substrait build script run: | -echo "Installing ${{inputs.rust-version}}" -rustup toolchain install ${{inputs.rust-version}} -rustup default ${{inputs.rust-version}} -rustup component add rustfmt clippy +SPARK_HOME=`pwd` ./mvnw clean install diff --git a/.github/actions/rust-test/action.yaml b/.github/actions/rust-test/action.yaml new file mode 100644 index 000..2b4ec36 --- /dev/null +++ b/.github/actions/rust-test/action.yaml @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +runs: + using: "composite" + steps: +- name: Check Cargo fmt + shell: bash + run: | +cd core +cargo fmt --all -- --check --color=never + +- name: Check Cargo clippy + shell: bash + run: | +cd core +cargo clippy --color=never -- -D warnings + +- name: Check compilation + shell: bash + run: | +cd core +cargo check --benches + +- name: Cache Maven dependencies + uses: actions/cache@v4 + with: +path: ~/.m2/repository +key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} +restore-keys: | + ${{ runner.os }}-maven- + +- name: Build common module (pre-requisite for Rust tests) + shell: bash + run: | +cd common +../mvnw clean compile -DskipTests + +- name: Run Cargo test + shell: bash + run: | +cd core +# This is required to run some JNI related tests on the Rust side +RUST_BACKTRACE=1 \ + LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$JAVA_HOME/lib:$JAVA_HOME/lib/server:$JAVA_HOME/lib/jli \ + DYLD_LIBRARY_PATH=$DYLD_LIBRARY_PATH:$JAVA_HOME/lib:$JAVA_HOME/lib/server:$JAVA_HOME/lib/jli
(arrow-datafusion-comet) branch main updated: build: Re-enable Scala style checker and spotless (#21)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 2e3614d build: Re-enable Scala style checker and spotless (#21) 2e3614d is described below commit 2e3614dd9e59f8666e2b5f1b158c00aaf44ed65a Author: Chao Sun AuthorDate: Wed Feb 14 20:02:09 2024 -0800 build: Re-enable Scala style checker and spotless (#21) This PR re-enables Scala style checker and spotless Maven plugin. Now the CI should fail when the code style is incorrect. --- dev/scalastyle-config.xml| 9 + pom.xml | 2 -- scalafmt.conf| 2 +- .../scala/org/apache/spark/sql/comet/CometBatchScanExec.scala| 4 ++-- spark/src/main/scala/org/apache/spark/sql/comet/operators.scala | 4 ++-- .../test/scala/org/apache/comet/parquet/ParquetReadSuite.scala | 4 ++-- 6 files changed, 12 insertions(+), 13 deletions(-) diff --git a/dev/scalastyle-config.xml b/dev/scalastyle-config.xml index 7d01f72..92ba690 100644 --- a/dev/scalastyle-config.xml +++ b/dev/scalastyle-config.xml @@ -242,9 +242,9 @@ This file is divided into 3 sections: java,scala,org,apache,3rdParty,comet javax?\..* scala\..* - org\..* - org\.apache\..* - (?!org\.apache\.comet\.).* + org\.(?!apache\.comet).* + org\.apache\.(?!comet).* + (?!(javax?\.|scala\.|org\.apache\.comet\.)).* org\.apache\.comet\..* @@ -301,7 +301,8 @@ This file is divided into 3 sections: - + + diff --git a/pom.xml b/pom.xml index dd8d0ce..e693033 100644 --- a/pom.xml +++ b/pom.xml @@ -740,7 +740,6 @@ under the License. - org.apache.maven.plugins maven-source-plugin diff --git a/scalafmt.conf b/scalafmt.conf index 7c8c2b3..ac27e9e 100644 --- a/scalafmt.conf +++ b/scalafmt.conf @@ -36,6 +36,6 @@ rewrite.imports.groups = [ ["scala\\..*"], ["org\\..*"], ["org\\.apache\\..*"], - ["org\\.apache\\.comet\\..*"], ["com\\..*"], + ["org\\.apache\\.comet\\..*"], ] diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala index 866231d..d6c3c87 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala @@ -32,11 +32,11 @@ import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.execution.metric._ import org.apache.spark.sql.vectorized._ +import com.google.common.base.Objects + import org.apache.comet.MetricsSupport import org.apache.comet.shims.ShimCometBatchScanExec -import com.google.common.base.Objects - case class CometBatchScanExec(wrapped: BatchScanExec, runtimeFilters: Seq[Expression]) extends DataSourceV2ScanExecBase with ShimCometBatchScanExec diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 46fc274..eac013e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -33,12 +33,12 @@ import org.apache.spark.sql.execution.{ColumnarToRowExec, ExecSubqueryExpression import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.vectorized.ColumnarBatch +import com.google.common.base.Objects + import org.apache.comet.{CometConf, CometExecIterator, CometRuntimeException, CometSparkSessionExtensions} import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION} import org.apache.comet.serde.OperatorOuterClass.Operator -import com.google.common.base.Objects - /** * A Comet physical operator */ diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 4f92242..1cff74d 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -47,11 +47,11 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +import com.google.common.primitives.UnsignedLong + import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus -import com.google.common.primitives.UnsignedLong - abstract class ParquetReadSuite extends CometTestBase { import testImplicits._
(arrow-datafusion-comet) branch main updated: build: Add PR template (#23)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new f5350e2 build: Add PR template (#23) f5350e2 is described below commit f5350e2ebe3c49efc7dfb3ccd9d8e0f18ca4be9b Author: Chao Sun AuthorDate: Wed Feb 14 16:39:19 2024 -0800 build: Add PR template (#23) Adding PR template for the repo. This is mostly copied from DataFusion but removed the "user facing" part. --- .github/pull_request_template.md | 30 ++ 1 file changed, 30 insertions(+) diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md new file mode 100644 index 000..967a179 --- /dev/null +++ b/.github/pull_request_template.md @@ -0,0 +1,30 @@ +## Which issue does this PR close? + + + +Closes #. + +## Rationale for this change + + + +## What changes are included in this PR? + + + +## How are these changes tested? + +
(arrow-datafusion-comet) branch main updated (5671f95 -> 311ef6b)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git from 5671f95 build: Add Maven wrapper to the project (#13) add 311ef6b build: Add basic CI test pipelines (#18) No new revisions were added by this update. Summary of changes: .github/actions/setup-builder/action.yaml | 44 +++ .github/workflows/pr_build.yml | 128 + core/src/errors.rs | 1 + pom.xml| 22 spark/pom.xml | 1 + .../spark/sql/comet/CometPlanStabilitySuite.scala | 4 +- 6 files changed, 198 insertions(+), 2 deletions(-) create mode 100644 .github/actions/setup-builder/action.yaml create mode 100644 .github/workflows/pr_build.yml
(arrow-datafusion-comet) branch main updated: build: Add Maven wrapper to the project (#13)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git The following commit(s) were added to refs/heads/main by this push: new 5671f95 build: Add Maven wrapper to the project (#13) 5671f95 is described below commit 5671f95de7593569935a26918b7dd82a1313d7fb Author: Chao Sun AuthorDate: Tue Feb 13 15:02:36 2024 -0800 build: Add Maven wrapper to the project (#13) This adds [Maven wrapper](https://maven.apache.org/wrapper/) to the project, to make the build environment more self-contained and independent from the Maven version on the host machine. --- .mvn/wrapper/maven-wrapper.jar| Bin 0 -> 62547 bytes .mvn/wrapper/maven-wrapper.properties | 19 +++ Makefile | 20 +-- mvnw | 308 ++ mvnw.cmd | 205 ++ 5 files changed, 542 insertions(+), 10 deletions(-) diff --git a/.mvn/wrapper/maven-wrapper.jar b/.mvn/wrapper/maven-wrapper.jar new file mode 100644 index 000..cb28b0e Binary files /dev/null and b/.mvn/wrapper/maven-wrapper.jar differ diff --git a/.mvn/wrapper/maven-wrapper.properties b/.mvn/wrapper/maven-wrapper.properties new file mode 100644 index 000..d6c51cf --- /dev/null +++ b/.mvn/wrapper/maven-wrapper.properties @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.6/apache-maven-3.9.6-bin.zip +wrapperUrl=https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar diff --git a/Makefile b/Makefile index 475c887..fe13fbd 100644 --- a/Makefile +++ b/Makefile @@ -22,27 +22,27 @@ all: core jvm core: cd core && cargo build jvm: - mvn clean package -DskipTests $(PROFILES) + ./mvnw clean package -DskipTests $(PROFILES) test: - mvn clean + ./mvnw clean # We need to compile CometException so that the cargo test can pass - mvn compile -pl common -DskipTests $(PROFILES) + ./mvnw compile -pl common -DskipTests $(PROFILES) cd core && cargo build && \ LD_LIBRARY_PATH=${LD_LIBRARY_PATH:+${LD_LIBRARY_PATH}:}${JAVA_HOME}/lib:${JAVA_HOME}/lib/server:${JAVA_HOME}/lib/jli && \ DYLD_LIBRARY_PATH=${DYLD_LIBRARY_PATH:+${DYLD_LIBRARY_PATH}:}${JAVA_HOME}/lib:${JAVA_HOME}/lib/server:${JAVA_HOME}/lib/jli \ RUST_BACKTRACE=1 cargo test - SPARK_HOME=`pwd` COMET_CONF_DIR=$(shell pwd)/conf RUST_BACKTRACE=1 mvn verify $(PROFILES) + SPARK_HOME=`pwd` COMET_CONF_DIR=$(shell pwd)/conf RUST_BACKTRACE=1 ./mvnw verify $(PROFILES) clean: cd core && cargo clean - mvn clean + ./mvnw clean rm -rf .dist bench: cd core && LD_LIBRARY_PATH=${LD_LIBRARY_PATH:+${LD_LIBRARY_PATH}:}${JAVA_HOME}/lib:${JAVA_HOME}/lib/server:${JAVA_HOME}/lib/jli && \ DYLD_LIBRARY_PATH=${DYLD_LIBRARY_PATH:+${DYLD_LIBRARY_PATH}:}${JAVA_HOME}/lib:${JAVA_HOME}/lib/server:${JAVA_HOME}/lib/jli \ RUSTFLAGS="-Ctarget-cpu=native" cargo bench $(filter-out $@,$(MAKECMDGOALS)) format: - mvn compile test-compile scalafix:scalafix -Psemanticdb $(PROFILES) - mvn spotless:apply $(PROFILES) + ./mvnw compile test-compile scalafix:scalafix -Psemanticdb $(PROFILES) + ./mvnw spotless:apply $(PROFILES) core-amd64: rustup target add x86_64-apple-darwin @@ -75,11 +75,11 @@ release-linux: clean cd core && RUSTFLAGS="-Ctarget-cpu=apple-m1" CC=arm64-apple-darwin21.4-clang CXX=arm64-apple-darwin21.4-clang++ CARGO_FEATURE_NEON=1 cargo build --target aarch64-apple-darwin --features nightly --release cd core && RUSTFLAGS="-Ctarget-cpu=skylake -Ctarget-feature=-prefer-256-bit" CC=o64-clang CXX=o64-clang++ cargo build --target x86_64-apple-darwin --features nightly --release cd core && RUSTFLAGS="-Ctarget-cpu=native -Ctarget-feature=-prefe
(arrow-datafusion-comet) branch comet-upstream updated (ce7852e -> 408dbe3)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a change to branch comet-upstream in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git from ce7852e Update DEBUGGING.md add 408dbe3 add license and address comments No new revisions were added by this update. Summary of changes: LICENSE.txt | 212 README.md | 2 +- 2 files changed, 213 insertions(+), 1 deletion(-) create mode 100644 LICENSE.txt
(arrow-datafusion-comet) branch comet-upstream updated (3feecfe -> 2b95ac4)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a change to branch comet-upstream in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git discard 3feecfe Initial PR add 2b95ac4 Initial PR This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (3feecfe) \ N -- N -- N refs/heads/comet-upstream (2b95ac4) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: dev/ensure-jars-have-correct-contents.sh | 1 + 1 file changed, 1 insertion(+)
(arrow-datafusion-comet) branch comet-upstream updated (0ad0ba7 -> 3feecfe)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a change to branch comet-upstream in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git discard 0ad0ba7 Initial PR for Comet donation add 3feecfe Initial PR This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (0ad0ba7) \ N -- N -- N refs/heads/comet-upstream (3feecfe) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes:
(arrow-datafusion-comet) branch comet-upstream created (now 0ad0ba7)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a change to branch comet-upstream in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git at 0ad0ba7 Initial PR for Comet donation This branch includes the following new commits: new 0ad0ba7 Initial PR for Comet donation The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[arrow-datafusion] branch main updated: `PrimitiveGroupsAccumulator` should propagate timestamp timezone information properly (#7494)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git The following commit(s) were added to refs/heads/main by this push: new 4d44512946 `PrimitiveGroupsAccumulator` should propagate timestamp timezone information properly (#7494) 4d44512946 is described below commit 4d4451294629940d340160cdd06be273139728b4 Author: Chao Sun AuthorDate: Thu Sep 7 13:58:22 2023 -0700 `PrimitiveGroupsAccumulator` should propagate timestamp timezone information properly (#7494) --- datafusion/physical-expr/src/aggregate/average.rs | 12 ++--- .../src/aggregate/groups_accumulator/prim_op.rs| 8 ++-- datafusion/physical-expr/src/aggregate/min_max.rs | 24 +- datafusion/physical-expr/src/aggregate/utils.rs| 24 -- datafusion/physical-expr/src/expressions/mod.rs| 54 +- 5 files changed, 83 insertions(+), 39 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs index ccadb2c9b8..92c806f76f 100644 --- a/datafusion/physical-expr/src/aggregate/average.rs +++ b/datafusion/physical-expr/src/aggregate/average.rs @@ -41,7 +41,7 @@ use datafusion_expr::type_coercion::aggregates::avg_return_type; use datafusion_expr::Accumulator; use super::groups_accumulator::EmitTo; -use super::utils::{adjust_output_array, Decimal128Averager}; +use super::utils::Decimal128Averager; /// AVG aggregate expression #[derive(Debug, Clone)] @@ -488,12 +488,10 @@ where .map(|(sum, count)| (self.avg_fn)(sum, count)) .collect::>>()?; PrimitiveArray::new(averages.into(), Some(nulls)) // no copy +.with_data_type(self.return_data_type.clone()) }; -// fix up decimal precision and scale for decimals -let array = adjust_output_array(_data_type, Arc::new(array))?; - -Ok(array) +Ok(Arc::new(array)) } // return arrays for sums and counts @@ -505,8 +503,8 @@ where let counts = UInt64Array::new(counts.into(), nulls.clone()); // zero copy let sums = emit_to.take_needed( self.sums); -let sums = PrimitiveArraynew(sums.into(), nulls); // zero copy -let sums = adjust_output_array(_data_type, Arc::new(sums))?; +let sums = PrimitiveArraynew(sums.into(), nulls) // zero copy +.with_data_type(self.sum_data_type.clone()); Ok(vec![ Arc::new(counts) as ArrayRef, diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs index adeaea712c..130d562712 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs @@ -22,7 +22,7 @@ use arrow_array::{ArrayRef, BooleanArray, PrimitiveArray}; use arrow_schema::DataType; use datafusion_common::Result; -use crate::{aggregate::utils::adjust_output_array, GroupsAccumulator}; +use crate::GroupsAccumulator; use super::{accumulate::NullState, EmitTo}; @@ -115,9 +115,9 @@ where fn evaluate( self, emit_to: EmitTo) -> Result { let values = emit_to.take_needed( self.values); let nulls = self.null_state.build(emit_to); -let values = PrimitiveArraynew(values.into(), Some(nulls)); // no copy - -adjust_output_array(_type, Arc::new(values)) +let values = PrimitiveArraynew(values.into(), Some(nulls)) // no copy +.with_data_type(self.data_type.clone()); +Ok(Arc::new(values)) } fn state( self, emit_to: EmitTo) -> Result> { diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs index 14e515861d..5c4c48b158 100644 --- a/datafusion/physical-expr/src/aggregate/min_max.rs +++ b/datafusion/physical-expr/src/aggregate/min_max.rs @@ -1075,8 +1075,8 @@ impl Accumulator for SlidingMinAccumulator { mod tests { use super::*; use crate::expressions::col; -use crate::expressions::tests::aggregate; -use crate::generic_test_op; +use crate::expressions::tests::{aggregate, aggregate_new}; +use crate::{generic_test_op, generic_test_op_new}; use arrow::datatypes::*; use arrow::record_batch::RecordBatch; use datafusion_common::Result; @@ -1494,6 +1494,26 @@ mod tests { ) } +#[test] +fn max_new_timestamp_micro() -> Result<()> { +let dt = DataType::Timestamp(TimeUnit::Microsecond, None); +let actual = TimestampMicrosecondArray::from(vec![1, 2, 3, 4, 5]) +.with_data_type(dt.clone()); +let expected: ArrayRef = + Arc::new(TimestampMicrosecondArray::from(vec![5]).with_data_type(dt.clone())); +generic_test_op_new!(Arc::new(
[arrow-rs] branch master updated (d797543 -> 15c87ae)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/arrow-rs.git. from d797543 Clarify docs that SlicesIterator ignores null values (#1504) add 15c87ae Add FFI for Arrow C Stream Interface (#1384) No new revisions were added by this update. Summary of changes: arrow/src/ffi.rs| 32 +-- arrow/src/ffi_stream.rs | 558 arrow/src/lib.rs| 1 + 3 files changed, 575 insertions(+), 16 deletions(-) create mode 100644 arrow/src/ffi_stream.rs
[arrow-rs] branch master updated: Remove Clone and copy source structs internally (#1449)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow-rs.git The following commit(s) were added to refs/heads/master by this push: new d02425d Remove Clone and copy source structs internally (#1449) d02425d is described below commit d02425dc0114c4e192e950aeea13e3c1e750a2b0 Author: Liang-Chi Hsieh AuthorDate: Fri Mar 18 23:26:50 2022 -0700 Remove Clone and copy source structs internally (#1449) * Remove Clone and copy source structs internally * Remove drop_in_place and add more comment * Add export_into_raw * Fix format * Fix clippy * Move to export_array_into_raw * Fix clippy * Fix doc * Use write_unaligned --- arrow/src/array/array.rs | 24 + arrow/src/array/mod.rs | 2 +- arrow/src/ffi.rs | 94 +--- 3 files changed, 107 insertions(+), 13 deletions(-) diff --git a/arrow/src/array/array.rs b/arrow/src/array/array.rs index 795439e..1ad01f4 100644 --- a/arrow/src/array/array.rs +++ b/arrow/src/array/array.rs @@ -632,6 +632,30 @@ pub unsafe fn make_array_from_raw( let data = ArrayData::try_from(array)?; Ok(make_array(data)) } + +/// Exports an array to raw pointers of the C Data Interface provided by the consumer. +/// # Safety +/// Assumes that these pointers represent valid C Data Interfaces, both in memory +/// representation and lifetime via the `release` mechanism. +/// +/// This function copies the content of two FFI structs [ffi::FFI_ArrowArray] and +/// [ffi::FFI_ArrowSchema] in the array to the location pointed by the raw pointers. +/// Usually the raw pointers are provided by the array data consumer. +pub unsafe fn export_array_into_raw( +src: ArrayRef, +out_array: *mut ffi::FFI_ArrowArray, +out_schema: *mut ffi::FFI_ArrowSchema, +) -> Result<()> { +let data = src.data(); +let array = ffi::FFI_ArrowArray::new(data); +let schema = ffi::FFI_ArrowSchema::try_from(data.data_type())?; + +std::ptr::write_unaligned(out_array, array); +std::ptr::write_unaligned(out_schema, schema); + +Ok(()) +} + // Helper function for printing potentially long arrays. pub(super) fn print_long_array( array: , diff --git a/arrow/src/array/mod.rs b/arrow/src/array/mod.rs index 864bae1..fc6441b 100644 --- a/arrow/src/array/mod.rs +++ b/arrow/src/array/mod.rs @@ -521,7 +521,7 @@ pub use self::cast::{ // -- C Data Interface --- -pub use self::array::make_array_from_raw; +pub use self::array::{export_array_into_raw, make_array_from_raw}; #[cfg(test)] mod tests { diff --git a/arrow/src/ffi.rs b/arrow/src/ffi.rs index 461995b..5fb1cce 100644 --- a/arrow/src/ffi.rs +++ b/arrow/src/ffi.rs @@ -26,9 +26,10 @@ //! //! ```rust //! # use std::sync::Arc; -//! # use arrow::array::{Int32Array, Array, ArrayData, make_array_from_raw}; +//! # use arrow::array::{Int32Array, Array, ArrayData, export_array_into_raw, make_array, make_array_from_raw}; //! # use arrow::error::{Result, ArrowError}; //! # use arrow::compute::kernels::arithmetic; +//! # use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; //! # use std::convert::TryFrom; //! # fn main() -> Result<()> { //! // create an array natively @@ -51,7 +52,35 @@ //! // verify //! assert_eq!(array, Int32Array::from(vec![Some(2), None, Some(6)])); //! +//! // Simulate if raw pointers are provided by consumer +//! let array = make_array(Int32Array::from(vec![Some(1), None, Some(3)]).data().clone()); +//! +//! let out_array = Box::new(FFI_ArrowArray::empty()); +//! let out_schema = Box::new(FFI_ArrowSchema::empty()); +//! let out_array_ptr = Box::into_raw(out_array); +//! let out_schema_ptr = Box::into_raw(out_schema); +//! +//! // export array into raw pointers from consumer +//! unsafe { export_array_into_raw(array, out_array_ptr, out_schema_ptr)?; }; +//! +//! // import it +//! let array = unsafe { make_array_from_raw(out_array_ptr, out_schema_ptr)? }; +//! +//! // perform some operation +//! let array = array.as_any().downcast_ref::().ok_or( +//! ArrowError::ParseError("Expects an int32".to_string()), +//! )?; +//! let array = arithmetic::add(, )?; +//! +//! // verify +//! assert_eq!(array, Int32Array::from(vec![Some(2), None, Some(6)])); +//! //! // (drop/release) +//! unsafe { +//! Box::from_raw(out_array_ptr); +//! Box::from_raw(out_schema_ptr); +//! } +//! //! Ok(()) //! } //! ``` @@ -107,7 +136,7 @@ bitflags! { /// See <https://arrow.apache.org/docs/format/CDataInterface.html#structure-definitions> /// This was created by bindgen #[repr(C)] -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct FFI_ArrowSchema { format: *const c_char, name: *const c_char, @@ -336,7 +365,7 @@ fn bit_width(data_type: , i: usize) -> Result
[arrow-rs] branch master updated (717216f -> f0646f8)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/arrow-rs.git. from 717216f Fix generate_decimal128_case (#1440) add f0646f8 Rewrite doc example of ListArray and LargeListArray (#1447) No new revisions were added by this update. Summary of changes: arrow/src/array/array_list.rs | 86 +++ 1 file changed, 38 insertions(+), 48 deletions(-)
[arrow-rs] branch master updated: Add doc example for creating `FixedSizeListArray` (#1426)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow-rs.git The following commit(s) were added to refs/heads/master by this push: new 729934c Add doc example for creating `FixedSizeListArray` (#1426) 729934c is described below commit 729934c5311b42fc7fd3522a22ddc049bdde9569 Author: Remzi Yang <59198230+haoyang...@users.noreply.github.com> AuthorDate: Sun Mar 13 01:51:34 2022 +0800 Add doc example for creating `FixedSizeListArray` (#1426) --- arrow/src/array/array_list.rs | 31 +++ 1 file changed, 31 insertions(+) diff --git a/arrow/src/array/array_list.rs b/arrow/src/array/array_list.rs index ff44540..e4036ea 100644 --- a/arrow/src/array/array_list.rs +++ b/arrow/src/array/array_list.rs @@ -343,6 +343,37 @@ pub type LargeListArray = GenericListArray; /// A list array where each element is a fixed-size sequence of values with the same /// type whose maximum length is represented by a i32. /// +/// # Example +/// +/// ``` +/// # use arrow::array::{Array, ArrayData, FixedSizeListArray, Int32Array}; +/// # use arrow::datatypes::{DataType, Field}; +/// # use arrow::buffer::Buffer; +/// // Construct a value array +/// let value_data = ArrayData::builder(DataType::Int32) +/// .len(9) +/// .add_buffer(Buffer::from_slice_ref(&[0, 1, 2, 3, 4, 5, 6, 7, 8])) +/// .build() +/// .unwrap(); +/// let list_data_type = DataType::FixedSizeList( +/// Box::new(Field::new("item", DataType::Int32, false)), +/// 3, +/// ); +/// let list_data = ArrayData::builder(list_data_type.clone()) +/// .len(3) +/// .add_child_data(value_data.clone()) +/// .build() +/// .unwrap(); +/// let list_array = FixedSizeListArray::from(list_data); +/// let list0 = list_array.value(0); +/// let list1 = list_array.value(1); +/// let list2 = list_array.value(2); +/// +/// assert_eq!( &[0, 1, 2], list0.as_any().downcast_ref::().unwrap().values()); +/// assert_eq!( &[3, 4, 5], list1.as_any().downcast_ref::().unwrap().values()); +/// assert_eq!( &[6, 7, 8], list2.as_any().downcast_ref::().unwrap().values()); +/// ``` +/// /// For non generic lists, you may wish to consider using /// [crate::array::FixedSizeBinaryArray] pub struct FixedSizeListArray {
[arrow-rs] branch master updated: Support nullable keys in DictionaryArray::try_new (#1430)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow-rs.git The following commit(s) were added to refs/heads/master by this push: new d9099c4 Support nullable keys in DictionaryArray::try_new (#1430) d9099c4 is described below commit d9099c4a0045f761bfa491b940bb89134f64b9a1 Author: Jörn Horstmann AuthorDate: Fri Mar 11 21:02:05 2022 +0100 Support nullable keys in DictionaryArray::try_new (#1430) * Support nullable keys in DictionaryArray::try_new * Set null count so it does not have to be recalculated --- arrow/src/array/array_dictionary.rs | 30 +- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/arrow/src/array/array_dictionary.rs b/arrow/src/array/array_dictionary.rs index a045714..3fc9545 100644 --- a/arrow/src/array/array_dictionary.rs +++ b/arrow/src/array/array_dictionary.rs @@ -98,13 +98,21 @@ impl<'a, K: ArrowPrimitiveType> DictionaryArray { // Note: This does more work than necessary by rebuilding / // revalidating all the data -let data = ArrayData::builder(dict_data_type) +let mut data = ArrayData::builder(dict_data_type) .len(keys.len()) .add_buffer(keys.data().buffers()[0].clone()) -.add_child_data(values.data().clone()) -.build()?; +.add_child_data(values.data().clone()); -Ok(data.into()) +match keys.data().null_buffer() { +Some(buffer) if keys.data().null_count() > 0 => { +data = data +.null_bit_buffer(buffer.clone()) +.null_count(keys.data().null_count()); +} +_ => data = data.null_count(0), +} + +Ok(data.build()?.into()) } /// Return an array view of the keys of this dictionary as a PrimitiveArray. @@ -528,8 +536,20 @@ mod tests { let array = DictionaryArraytry_new(, ).unwrap(); assert_eq!(array.keys().data_type(), ::Int32); assert_eq!(array.values().data_type(), ::Utf8); + +assert_eq!(array.data().null_count(), 1); + +assert!(array.keys().is_valid(0)); +assert!(array.keys().is_valid(1)); +assert!(array.keys().is_null(2)); +assert!(array.keys().is_valid(3)); + +assert_eq!(array.keys().value(0), 0); +assert_eq!(array.keys().value(1), 2); +assert_eq!(array.keys().value(3), 1); + assert_eq!( -"DictionaryArray {keys: PrimitiveArray\n[\n 0,\n 2,\n 0,\n 1,\n] values: StringArray\n[\n \"foo\",\n \"bar\",\n \"baz\",\n]}\n", +"DictionaryArray {keys: PrimitiveArray\n[\n 0,\n 2,\n null,\n 1,\n] values: StringArray\n[\n \"foo\",\n \"bar\",\n \"baz\",\n]}\n", format!("{:?}", array) ); }
[arrow-rs] branch master updated (7393560 -> 4e10321)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/arrow-rs.git. from 7393560 Add value_unchecked() for FixedSizeBinaryArray (#1420) add 4e10321 Fix possibly unaligned writes in MutableBuffer (#1421) No new revisions were added by this update. Summary of changes: arrow/src/buffer/mutable.rs | 105 +++- 1 file changed, 74 insertions(+), 31 deletions(-)
[arrow-rs] branch master updated: Add value_unchecked() for FixedSizeBinaryArray (#1420)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow-rs.git The following commit(s) were added to refs/heads/master by this push: new 7393560 Add value_unchecked() for FixedSizeBinaryArray (#1420) 7393560 is described below commit 73935606a9ab436ccb6b71cf5c60901384f66e29 Author: jakevin <30525741+jackwe...@users.noreply.github.com> AuthorDate: Sat Mar 12 02:31:08 2022 +0800 Add value_unchecked() for FixedSizeBinaryArray (#1420) --- arrow/src/array/array_binary.rs | 12 arrow/src/array/equal/mod.rs| 13 + 2 files changed, 25 insertions(+) diff --git a/arrow/src/array/array_binary.rs b/arrow/src/array/array_binary.rs index e35133e..88ff3bf 100644 --- a/arrow/src/array/array_binary.rs +++ b/arrow/src/array/array_binary.rs @@ -473,6 +473,18 @@ impl FixedSizeBinaryArray { } } +/// Returns the element at index `i` as a byte slice. +/// # Safety +/// Caller is responsible for ensuring that the index is within the bounds of the array +pub unsafe fn value_unchecked(, i: usize) -> &[u8] { +let offset = i.checked_add(self.data.offset()).unwrap(); +let pos = self.value_offset_at(offset); +std::slice::from_raw_parts( +self.value_data.as_ptr().offset(pos as isize), +(self.value_offset_at(offset + 1) - pos) as usize, +) +} + /// Returns the offset for the element at index `i`. /// /// Note this doesn't do any bound checking, for performance reason. diff --git a/arrow/src/array/equal/mod.rs b/arrow/src/array/equal/mod.rs index 7bd0b70..4f27786 100644 --- a/arrow/src/array/equal/mod.rs +++ b/arrow/src/array/equal/mod.rs @@ -601,6 +601,19 @@ mod tests { } #[test] +fn test_fixed_size_binary_array() { +let a_input_arg = vec![vec![1, 2], vec![3, 4], vec![5, 6]]; +let a = FixedSizeBinaryArray::try_from_iter(a_input_arg.into_iter()).unwrap(); +let a = a.data(); + +let b_input_arg = vec![vec![1, 2], vec![3, 4], vec![5, 6]]; +let b = FixedSizeBinaryArray::try_from_iter(b_input_arg.into_iter()).unwrap(); +let b = b.data(); + +test_equal(a, b, true); +} + +#[test] fn test_string_offset() { let a = StringArray::from(vec![Some("a"), None, Some("b")]); let a = a.data();
[arrow-rs] branch master updated (0d24777 -> fcb8950)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/arrow-rs.git. from 0d24777 Directly write to MutableBuffer in substring (#1423) add fcb8950 Remove duplicate bound check in the function shift (#1409) No new revisions were added by this update. Summary of changes: arrow/src/compute/kernels/window.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-)
[arrow-rs] branch master updated (4bcc7a6 -> 0d24777)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/arrow-rs.git. from 4bcc7a6 Implement projection for arrow file / streams (#1339) add 0d24777 Directly write to MutableBuffer in substring (#1423) No new revisions were added by this update. Summary of changes: arrow/Cargo.toml | 4 .../{length_kernel.rs => string_kernels.rs}| 28 ++ arrow/src/compute/kernels/substring.rs | 8 +++ arrow/src/util/bench_util.rs | 11 - 4 files changed, 30 insertions(+), 21 deletions(-) copy arrow/benches/{length_kernel.rs => string_kernels.rs} (67%)
[arrow-rs] branch master updated: Add dictionary support for C data interface (#1407)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow-rs.git The following commit(s) were added to refs/heads/master by this push: new f19d1ed Add dictionary support for C data interface (#1407) f19d1ed is described below commit f19d1ed71c2318407cf6beaab226355e0d005daa Author: Chao Sun AuthorDate: Tue Mar 8 23:27:43 2022 -0800 Add dictionary support for C data interface (#1407) * initial commit * add integration tests for python * address comments --- .../tests/test_sql.py | 19 ++-- arrow/src/array/ffi.rs | 22 arrow/src/datatypes/ffi.rs | 104 ++- arrow/src/ffi.rs | 111 +++-- 4 files changed, 192 insertions(+), 64 deletions(-) diff --git a/arrow-pyarrow-integration-testing/tests/test_sql.py b/arrow-pyarrow-integration-testing/tests/test_sql.py index bacd118..058a32e 100644 --- a/arrow-pyarrow-integration-testing/tests/test_sql.py +++ b/arrow-pyarrow-integration-testing/tests/test_sql.py @@ -79,6 +79,7 @@ _supported_pyarrow_types = [ pa.field("c", pa.string()), ] ), +pa.dictionary(pa.int8(), pa.string()), ] _unsupported_pyarrow_types = [ @@ -122,14 +123,6 @@ def test_type_roundtrip_raises(pyarrow_type): with pytest.raises(pa.ArrowException): rust.round_trip_type(pyarrow_type) - -def test_dictionary_type_roundtrip(): -# the dictionary type conversion is incomplete -pyarrow_type = pa.dictionary(pa.int32(), pa.string()) -ty = rust.round_trip_type(pyarrow_type) -assert ty == pa.int32() - - @pytest.mark.parametrize('pyarrow_type', _supported_pyarrow_types, ids=str) def test_field_roundtrip(pyarrow_type): pyarrow_field = pa.field("test", pyarrow_type, nullable=True) @@ -263,3 +256,13 @@ def test_decimal_python(): assert a == b del a del b + +def test_dictionary_python(): +""" +Python -> Rust -> Python +""" +a = pa.array(["a", None, "b", None, "a"], type=pa.dictionary(pa.int8(), pa.string())) +b = rust.round_trip_array(a) +assert a == b +del a +del b diff --git a/arrow/src/array/ffi.rs b/arrow/src/array/ffi.rs index 847649c..976c6b8 100644 --- a/arrow/src/array/ffi.rs +++ b/arrow/src/array/ffi.rs @@ -45,6 +45,7 @@ impl TryFrom for ffi::ArrowArray { #[cfg(test)] mod tests { +use crate::array::{DictionaryArray, Int32Array, StringArray}; use crate::error::Result; use crate::{ array::{ @@ -127,4 +128,25 @@ mod tests { let data = array.data(); test_round_trip(data) } + +#[test] +fn test_dictionary() -> Result<()> { +let values = StringArray::from(vec![Some("foo"), Some("bar"), None]); +let keys = Int32Array::from(vec![ +Some(0), +Some(1), +None, +Some(1), +Some(1), +None, +Some(1), +Some(2), +Some(1), +None, +]); +let array = DictionaryArray::try_new(, )?; + +let data = array.data(); +test_round_trip(data) +} } diff --git a/arrow/src/datatypes/ffi.rs b/arrow/src/datatypes/ffi.rs index fbff4a0..10645fb 100644 --- a/arrow/src/datatypes/ffi.rs +++ b/arrow/src/datatypes/ffi.rs @@ -28,7 +28,7 @@ impl TryFrom<_ArrowSchema> for DataType { /// See [CDataInterface docs](https://arrow.apache.org/docs/format/CDataInterface.html#data-type-description-format-strings) fn try_from(c_schema: _ArrowSchema) -> Result { -let dtype = match c_schema.format() { +let mut dtype = match c_schema.format() { "n" => DataType::Null, "b" => DataType::Boolean, "c" => DataType::Int8, @@ -134,6 +134,12 @@ impl TryFrom<_ArrowSchema> for DataType { } } }; + +if let Some(dict_schema) = c_schema.dictionary() { +let value_type = Self::try_from(dict_schema)?; +dtype = DataType::Dictionary(Box::new(dtype), Box::new(value_type)); +} + Ok(dtype) } } @@ -169,49 +175,7 @@ impl TryFrom<> for FFI_ArrowSchema { /// See [CDataInterface docs](https://arrow.apache.org/docs/format/CDataInterface.html#data-type-description-format-strings) fn try_from(dtype: ) -> Result { -let format = match dtype { -DataType::Null => "n".to_string(), -DataType::Boolean => "b".to_string(), -DataType::Int8 => "c".to_string(), -DataType::UInt8 => "C".to_string(), -
[arrow-rs] branch master updated: Remove redundant has_ methods for optional column metadata fields (#1345)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow-rs.git The following commit(s) were added to refs/heads/master by this push: new a69f0d1 Remove redundant has_ methods for optional column metadata fields (#1345) a69f0d1 is described below commit a69f0d119c67ef19e6a61acd24359de1b4ed2e10 Author: Shani Solomon <84128749+shanisolo...@users.noreply.github.com> AuthorDate: Tue Feb 22 19:03:53 2022 +0200 Remove redundant has_ methods for optional column metadata fields (#1345) --- parquet/src/file/metadata.rs | 18 -- parquet/src/file/serialized_reader.rs | 2 -- 2 files changed, 20 deletions(-) diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs index 49ac072..779b806 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata.rs @@ -469,35 +469,17 @@ impl ColumnChunkMetaData { self.statistics.as_ref() } -/// Returns `true` if this column chunk contains page encoding stats, `false` otherwise. -pub fn has_page_encoding_stats() -> bool { -self.encoding_stats.is_some() -} - /// Returns the offset for the page encoding stats, /// or `None` if no page encoding stats are available. pub fn page_encoding_stats() -> Option<> { self.encoding_stats.as_ref() } -/// Returns `true` if this column chunk contains a bloom filter offset, `false` otherwise. -pub fn has_bloom_filter() -> bool { -self.bloom_filter_offset.is_some() -} - /// Returns the offset for the bloom filter. pub fn bloom_filter_offset() -> Option { self.bloom_filter_offset } -/// Returns `true` if this column chunk contains a column index, `false` otherwise. -pub fn has_column_index() -> bool { -self.column_index_offset.is_some() -&& self.column_index_length.is_some() -&& self.offset_index_offset.is_some() -&& self.offset_index_length.is_some() -} - /// Returns the offset for the column index. pub fn column_index_offset() -> Option { self.column_index_offset diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 4d19be8..c91e832 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -773,7 +773,6 @@ mod tests { assert_eq!(col0_metadata.bloom_filter_offset().unwrap(), 192); // test page encoding stats -assert!(col0_metadata.has_page_encoding_stats()); let page_encoding_stats = col0_metadata.page_encoding_stats().unwrap().get(0).unwrap(); @@ -782,7 +781,6 @@ mod tests { assert_eq!(page_encoding_stats.count, 1); // test optional column index offset -assert!(col0_metadata.has_column_index()); assert_eq!(col0_metadata.column_index_offset().unwrap(), 156); assert_eq!(col0_metadata.column_index_length().unwrap(), 25);
[arrow-rs] branch master updated: Refactor Bitmap::new (#1343)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow-rs.git The following commit(s) were added to refs/heads/master by this push: new 89ee9ac Refactor Bitmap::new (#1343) 89ee9ac is described below commit 89ee9acf01b5b9895596a3843212076ce82c9b00 Author: Remzi Yang <59198230+haoyang...@users.noreply.github.com> AuthorDate: Mon Feb 21 13:59:05 2022 +0800 Refactor Bitmap::new (#1343) ### Which issue does this PR close? Closes #1337. ### Rationale for this change `bit_util` has provided some functions to calculate the ceiling and multiple, so we can use them in `Bitmap::new` to achieve a faster and cleaner code. ### What changes are included in this PR? ### Are there any user-facing changes? None. --- arrow/src/bitmap.rs | 9 ++--- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/arrow/src/bitmap.rs b/arrow/src/bitmap.rs index 6fb721a..4ba1bb9 100644 --- a/arrow/src/bitmap.rs +++ b/arrow/src/bitmap.rs @@ -35,13 +35,8 @@ pub struct Bitmap { impl Bitmap { pub fn new(num_bits: usize) -> Self { -let num_bytes = num_bits / 8 + if num_bits % 8 > 0 { 1 } else { 0 }; -let r = num_bytes % 64; -let len = if r == 0 { -num_bytes -} else { -num_bytes + 64 - r -}; +let num_bytes = bit_util::ceil(num_bits, 8); +let len = bit_util::round_upto_multiple_of_64(num_bytes); Bitmap { bits: Buffer::from(![0xFF; len]), }
[arrow-rs] branch master updated: Don't use Arc::from_raw when importing ArrowArray and ArrowSchema (#1334)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow-rs.git The following commit(s) were added to refs/heads/master by this push: new f84c436 Don't use Arc::from_raw when importing ArrowArray and ArrowSchema (#1334) f84c436 is described below commit f84c4364d5e1cd5879f5ef9cc6441cdf233df8c3 Author: Liang-Chi Hsieh AuthorDate: Sun Feb 20 10:26:03 2022 -0800 Don't use Arc::from_raw when importing ArrowArray and ArrowSchema (#1334) --- arrow/src/ffi.rs | 10 ++ 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/arrow/src/ffi.rs b/arrow/src/ffi.rs index 5a066cc..b7a22de 100644 --- a/arrow/src/ffi.rs +++ b/arrow/src/ffi.rs @@ -107,7 +107,7 @@ bitflags! { /// See <https://arrow.apache.org/docs/format/CDataInterface.html#structure-definitions> /// This was created by bindgen #[repr(C)] -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct FFI_ArrowSchema { format: *const c_char, name: *const c_char, @@ -316,7 +316,7 @@ fn bit_width(data_type: , i: usize) -> Result { /// See <https://arrow.apache.org/docs/format/CDataInterface.html#structure-definitions> /// This was created by bindgen #[repr(C)] -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct FFI_ArrowArray { pub(crate) length: i64, pub(crate) null_count: i64, @@ -721,9 +721,11 @@ impl ArrowArray { .to_string(), )); }; +let ffi_array = (*array).clone(); +let ffi_schema = (*schema).clone(); Ok(Self { -array: Arc::from_raw(array as *mut FFI_ArrowArray), -schema: Arc::from_raw(schema as *mut FFI_ArrowSchema), +array: Arc::new(ffi_array), +schema: Arc::new(ffi_schema), }) }
[arrow] branch master updated: ARROW-15657: [C++][Java] Upgrade Apache ORC to 1.7.3
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 45041fc ARROW-15657: [C++][Java] Upgrade Apache ORC to 1.7.3 45041fc is described below commit 45041fcd92b72bd36c08ca8d03074ccef7d9d782 Author: Dongjoon Hyun AuthorDate: Sun Feb 13 11:26:14 2022 -0800 ARROW-15657: [C++][Java] Upgrade Apache ORC to 1.7.3 This PR aims to upgrade Apache ORC to 1.7.3 which is the 3rd maintenance release at 1.7 line. - https://orc.apache.org/news/2022/02/09/ORC-1.7.3/ - https://github.com/apache/orc/releases/tag/v1.7.3 Closes #12397 from dongjoon-hyun/ARROW-15657 Authored-by: Dongjoon Hyun Signed-off-by: Chao Sun --- cpp/thirdparty/versions.txt | 4 ++-- java/adapter/orc/pom.xml| 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/thirdparty/versions.txt b/cpp/thirdparty/versions.txt index 27a6ca5..6a86b4d 100644 --- a/cpp/thirdparty/versions.txt +++ b/cpp/thirdparty/versions.txt @@ -70,8 +70,8 @@ ARROW_OPENTELEMETRY_BUILD_VERSION=v1.1.0 ARROW_OPENTELEMETRY_BUILD_SHA256_CHECKSUM=f8fd3a47be382dc79c19d7e7efcf86a0dfbb5a237db6e0618dbb7eb8e058 ARROW_OPENTELEMETRY_PROTO_BUILD_VERSION=v0.11.0 ARROW_OPENTELEMETRY_PROTO_BUILD_SHA256_CHECKSUM=985367f8905e91018e636cbf0d83ab3f834b665c4f5899a27d10cae9657710e2 -ARROW_ORC_BUILD_VERSION=1.7.2 -ARROW_ORC_BUILD_SHA256_CHECKSUM=ef39bae755116fecb07fb4334656f984ba8ce7cdb1c64de078d3ed186b286007 +ARROW_ORC_BUILD_VERSION=1.7.3 +ARROW_ORC_BUILD_SHA256_CHECKSUM=535c4d7588172e85b8fc941cd0575d67f2155a55e4cd65d87d5b194b0bb28a31 ARROW_PROTOBUF_BUILD_VERSION=v3.18.1 ARROW_PROTOBUF_BUILD_SHA256_CHECKSUM=b8ab9bbdf0c6968cf20060794bc61e231fae82aaf69d6e3577c154181991f576 # Because of https://github.com/Tencent/rapidjson/pull/1323, we require diff --git a/java/adapter/orc/pom.xml b/java/adapter/orc/pom.xml index ed019ef..ab5146f 100644 --- a/java/adapter/orc/pom.xml +++ b/java/adapter/orc/pom.xml @@ -35,7 +35,7 @@ org.apache.orc orc-core -1.7.2 +1.7.3 test
[arrow-rs] branch master updated: Make rle decoder public (#1271)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow-rs.git The following commit(s) were added to refs/heads/master by this push: new ce15d0c Make rle decoder public (#1271) ce15d0c is described below commit ce15d0cd7bad127799f3f0d5845e2a0267e670a9 Author: Ze'ev Maor AuthorDate: Sun Feb 6 03:44:01 2022 +0200 Make rle decoder public (#1271) --- parquet/src/encodings/mod.rs | 2 +- parquet/src/lib.rs | 14 +- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/parquet/src/encodings/mod.rs b/parquet/src/encodings/mod.rs index 6046dda..9577a8e 100644 --- a/parquet/src/encodings/mod.rs +++ b/parquet/src/encodings/mod.rs @@ -18,4 +18,4 @@ pub mod decoding; pub mod encoding; pub mod levels; -pub(crate) mod rle; +experimental_mod_crate!(rle); diff --git a/parquet/src/lib.rs b/parquet/src/lib.rs index 1799f9c..b50fab4 100644 --- a/parquet/src/lib.rs +++ b/parquet/src/lib.rs @@ -68,6 +68,18 @@ macro_rules! experimental_mod { }; } +macro_rules! experimental_mod_crate { +($module:ident $(, #[$meta:meta])*) => { +#[cfg(feature = "experimental")] +#[doc(hidden)] +$(#[$meta])* +pub mod $module; +#[cfg(not(feature = "experimental"))] +$(#[$meta])* +pub(crate) mod $module; +}; +} + #[macro_use] pub mod errors; pub mod basic; @@ -89,7 +101,7 @@ experimental_mod!(util, #[macro_use]); pub mod arrow; pub mod column; experimental_mod!(compression); -mod encodings; +experimental_mod!(encodings); pub mod file; pub mod record; pub mod schema;
[arrow-rs] branch any updated: Replace ambiguous Any with All in comments (#1126)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch any in repository https://gitbox.apache.org/repos/asf/arrow-rs.git The following commit(s) were added to refs/heads/any by this push: new e0b3105 Replace ambiguous Any with All in comments (#1126) e0b3105 is described below commit e0b3105ab82c652e523df3df02d23fc808f9c0e9 Author: Dominik Moritz AuthorDate: Tue Jan 4 12:29:14 2022 -0500 Replace ambiguous Any with All in comments (#1126) --- parquet/src/file/properties.rs | 18 +- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index c48e4e7..a44bbba 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -308,13 +308,13 @@ impl WriterPropertiesBuilder { } // -- -// Setters for any column (global) +// Setters for all columns -/// Sets encoding for any column. +/// Sets encoding for all columns. /// -/// If dictionary is not enabled, this is treated as a primary encoding for all -/// columns. In case when dictionary is enabled for any column, this value is -/// considered to be a fallback encoding for that column. +/// If dictionary is not enabled, the provided value is treated as a primary +/// encoding for all columns. When dictionary is enabled for a column, this +/// value is considered to be a fallback encoding for that column. /// /// Panics if user tries to set dictionary encoding here, regardless of dictionary /// encoding flag being set. @@ -323,13 +323,13 @@ impl WriterPropertiesBuilder { self } -/// Sets compression codec for any column. +/// Sets compression codec for all columns. pub fn set_compression(mut self, value: Compression) -> Self { self.default_column_properties.set_compression(value); self } -/// Sets flag to enable/disable dictionary encoding for any column. +/// Sets flag to enable/disable dictionary encoding for all columns. /// /// Use this method to set dictionary encoding, instead of explicitly specifying /// encoding in `set_encoding` method. @@ -338,13 +338,13 @@ impl WriterPropertiesBuilder { self } -/// Sets flag to enable/disable statistics for any column. +/// Sets flag to enable/disable statistics for all columns. pub fn set_statistics_enabled(mut self, value: bool) -> Self { self.default_column_properties.set_statistics_enabled(value); self } -/// Sets max statistics size for any column. +/// Sets max statistics size for all columns. /// Applicable only if statistics are enabled. pub fn set_max_statistics_size(mut self, value: usize) -> Self { self.default_column_properties
[arrow-rs] branch master updated (07660c6 -> ab48e69)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/arrow-rs.git. from 07660c6 Simplify parquet arror `RecordReader` (#1021) add ab48e69 Extract method to drive PageIterator -> RecordReader (#1031) No new revisions were added by this update. Summary of changes: parquet/src/arrow/array_reader.rs | 88 --- 1 file changed, 37 insertions(+), 51 deletions(-)
[arrow] branch master updated (36367fe -> 902b541)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git. from 36367fe ARROW-14676: [R][Docs] Add article on how to build a few different setups via docker to dev docs add 902b541 ARROW-14718: [Java] loadValidityBuffer should avoid allocating memory when input is not null and there are only null or non-null values No new revisions were added by this update. Summary of changes: .../org/apache/arrow/vector/BitVectorHelper.java | 11 - .../apache/arrow/vector/TestBitVectorHelper.java | 56 ++ 2 files changed, 65 insertions(+), 2 deletions(-)
[arrow-rs] branch master updated: Simplify parquet arror `RecordReader` (#1021)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow-rs.git The following commit(s) were added to refs/heads/master by this push: new 07660c6 Simplify parquet arror `RecordReader` (#1021) 07660c6 is described below commit 07660c61680220ac54b7bf4c42a64c840872cc43 Author: Raphael Taylor-Davies <1781103+tustv...@users.noreply.github.com> AuthorDate: Mon Dec 13 21:44:47 2021 + Simplify parquet arror `RecordReader` (#1021) --- parquet/src/arrow/record_reader.rs | 73 +- 1 file changed, 33 insertions(+), 40 deletions(-) diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index 4dd7da9..a5c0b47 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -43,10 +43,8 @@ pub struct RecordReader { /// Number of values `num_records` contains. num_values: usize, -values_seen: usize, /// Starts from 1, number of values have been written to buffer values_written: usize, -in_middle_of_record: bool, } impl RecordReader { @@ -75,9 +73,7 @@ impl RecordReader { column_desc: column_schema, num_records: 0, num_values: 0, -values_seen: 0, values_written: 0, -in_middle_of_record: false, } } @@ -107,21 +103,25 @@ impl RecordReader { loop { // Try to find some records from buffers that has been read into memory // but not counted as seen records. -records_read += self.split_records(num_records - records_read)?; - -// Since page reader contains complete records, so if we reached end of a -// page reader, we should reach the end of a record -if end_of_column -&& self.values_seen >= self.values_written -&& self.in_middle_of_record -{ -self.num_records += 1; -self.num_values = self.values_seen; -self.in_middle_of_record = false; -records_read += 1; +let (record_count, value_count) = +self.count_records(num_records - records_read); + +self.num_records += record_count; +self.num_values += value_count; +records_read += record_count; + +if records_read == num_records { +break; } -if (records_read >= num_records) || end_of_column { +if end_of_column { +// Since page reader contains complete records, if we reached end of a +// page reader, we should reach the end of a record +if self.rep_levels.is_some() { +self.num_records += 1; +self.num_values = self.values_written; +records_read += 1; +} break; } @@ -265,8 +265,6 @@ impl RecordReader { self.values_written -= self.num_values; self.num_records = 0; self.num_values = 0; -self.values_seen = 0; -self.in_middle_of_record = false; } /// Returns bitmap data. @@ -367,10 +365,11 @@ impl RecordReader { Ok(values_read) } -/// Split values into records according repetition definition and returns number of -/// records read. -#[allow(clippy::unnecessary_wraps)] -fn split_records( self, records_to_read: usize) -> Result { +/// Inspects the buffered repetition levels in the range `self.num_values..self.values_written` +/// and returns the number of "complete" records along with the corresponding number of values +/// +/// A "complete" record is one where the buffer contains a subsequent repetition level of 0 +fn count_records(, records_to_read: usize) -> (usize, usize) { let rep_levels = self.rep_levels.as_ref().map(|buf| { let (prefix, rep_levels, suffix) = unsafe { buf.as_slice().align_to::() }; @@ -381,32 +380,26 @@ impl RecordReader { match rep_levels { Some(buf) => { let mut records_read = 0; +let mut end_of_last_record = self.num_values; + +for current in self.num_values..self.values_written { +if buf[current] == 0 && current != self.num_values { +records_read += 1; +end_of_last_record = current; -while (self.values_seen < self.values_written) -&& (records_read < records_to_read) -{ -if buf[self.values_seen] == 0 { -if self.in_middle_of_record { -records_read += 1; -
[arrow-rs] branch master updated: Write FixedLenByteArray stats for FixedLenByteArray columns (not ByteArray stats) (#662)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow-rs.git The following commit(s) were added to refs/heads/master by this push: new fa5acd9 Write FixedLenByteArray stats for FixedLenByteArray columns (not ByteArray stats) (#662) fa5acd9 is described below commit fa5acd971c973161f17e69d5c6b50d6e77c7da03 Author: Andrew Lamb AuthorDate: Mon Aug 9 20:58:03 2021 -0400 Write FixedLenByteArray stats for FixedLenByteArray columns (not ByteArray stats) (#662) --- parquet/src/column/writer.rs | 31 --- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs index af76c84..0da9439 100644 --- a/parquet/src/column/writer.rs +++ b/parquet/src/column/writer.rs @@ -924,11 +924,28 @@ impl ColumnWriterImpl { Type::INT96 => gen_stats_section!(Int96, int96, min, max, distinct, nulls), Type::FLOAT => gen_stats_section!(f32, float, min, max, distinct, nulls), Type::DOUBLE => gen_stats_section!(f64, double, min, max, distinct, nulls), -Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => { +Type::BYTE_ARRAY => { let min = min.as_ref().map(|v| ByteArray::from(v.as_bytes().to_vec())); let max = max.as_ref().map(|v| ByteArray::from(v.as_bytes().to_vec())); Statistics::byte_array(min, max, distinct, nulls, false) } +Type::FIXED_LEN_BYTE_ARRAY => { +let min = min +.as_ref() +.map(|v| ByteArray::from(v.as_bytes().to_vec())) +.map(|ba| { +let ba: FixedLenByteArray = ba.into(); +ba +}); +let max = max +.as_ref() +.map(|v| ByteArray::from(v.as_bytes().to_vec())) +.map(|ba| { +let ba: FixedLenByteArray = ba.into(); +ba +}); +Statistics::fixed_len_byte_array(min, max, distinct, nulls, false) +} } } @@ -1797,13 +1814,13 @@ mod tests { let stats = statistics_roundtrip::(); assert!(stats.has_min_max_set()); -// should it be FixedLenByteArray? -// https://github.com/apache/arrow-rs/issues/660 -if let Statistics::ByteArray(stats) = stats { -assert_eq!(stats.min(), ::from("aaw ")); -assert_eq!(stats.max(), ::from("zz ")); +if let Statistics::FixedLenByteArray(stats) = stats { +let expected_min: FixedLenByteArray = ByteArray::from("aaw ").into(); +assert_eq!(stats.min(), _min); +let expected_max: FixedLenByteArray = ByteArray::from("zz ").into(); +assert_eq!(stats.max(), _max); } else { -panic!("expecting Statistics::ByteArray, got {:?}", stats); +panic!("expecting Statistics::FixedLenByteArray, got {:?}", stats); } }
[arrow] branch master updated (8d5e6e8 -> e7f005d)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git. from 8d5e6e8 ARROW-12636: [JS] ESM Tree-Shaking produces broken code add e7f005d ARROW-13506: [C++][Java] Upgrade ORC to 1.6.9 No new revisions were added by this update. Summary of changes: cpp/thirdparty/versions.txt | 2 +- java/adapter/orc/pom.xml| 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-)
[arrow-rs] branch master updated (1d72b26 -> 94a82cd)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/arrow-rs.git. from 1d72b26 allow to read non-standard CSV (#326) add 94a82cd ensure null-counts are written for all-null columns (#307) No new revisions were added by this update. Summary of changes: parquet/src/arrow/arrow_writer.rs | 17 + parquet/src/column/writer.rs | 4 +++- 2 files changed, 20 insertions(+), 1 deletion(-)
[arrow-rs] branch master updated: parquet: Speed up `BitReader`/`DeltaBitPackDecoder` (#325)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow-rs.git The following commit(s) were added to refs/heads/master by this push: new b2de544 parquet: Speed up `BitReader`/`DeltaBitPackDecoder` (#325) b2de544 is described below commit b2de5446cc1e45a0559fb39039d0545df1ac0d26 Author: Kornelijus Survila AuthorDate: Sun May 23 19:00:42 2021 -0600 parquet: Speed up `BitReader`/`DeltaBitPackDecoder` (#325) * parquet: Avoid temporary `BufferPtr`s in `BitReader` From a quick test, this speeds up reading delta-packed int columns by over 30%. * parquet: Avoid some allocations in `DeltaBitPackDecoder` From a quick test, it seems to decode around 10% faster overall. --- parquet/src/encodings/decoding.rs | 7 +++ parquet/src/util/bit_util.rs | 13 +++-- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/parquet/src/encodings/decoding.rs b/parquet/src/encodings/decoding.rs index b73ebf0..e83e277 100644 --- a/parquet/src/encodings/decoding.rs +++ b/parquet/src/encodings/decoding.rs @@ -395,16 +395,15 @@ impl DeltaBitPackDecoder { .get_zigzag_vlq_int() .ok_or_else(|| eof_err!("Not enough data to decode 'min_delta'"))?; -let mut widths = vec![]; +self.delta_bit_widths.clear(); for _ in 0..self.num_mini_blocks { let w = self .bit_reader .get_aligned::(1) .ok_or_else(|| eof_err!("Not enough data to decode 'width'"))?; -widths.push(w); +self.delta_bit_widths.push(w); } -self.delta_bit_widths.set_data(widths); self.mini_block_idx = 0; self.delta_bit_width = self.delta_bit_widths.data()[0]; self.values_current_mini_block = self.values_per_mini_block; @@ -417,7 +416,6 @@ impl DeltaBitPackDecoder { where T::T: FromBytes, { -self.deltas_in_mini_block.clear(); if self.use_batch { self.deltas_in_mini_block .resize(self.values_current_mini_block, T::T::default()); @@ -427,6 +425,7 @@ impl DeltaBitPackDecoder { ); assert!(loaded == self.values_current_mini_block); } else { +self.deltas_in_mini_block.clear(); for _ in 0..self.values_current_mini_block { // TODO: load one batch at a time similar to int32 let delta = self diff --git a/parquet/src/util/bit_util.rs b/parquet/src/util/bit_util.rs index 677b669..8dfb631 100644 --- a/parquet/src/util/bit_util.rs +++ b/parquet/src/util/bit_util.rs @@ -603,11 +603,7 @@ impl BitReader { // Advance byte_offset to next unread byte and read num_bytes self.byte_offset += bytes_read; -let v = read_num_bytes!( -T, -num_bytes, -self.buffer.start_from(self.byte_offset).as_ref() -); +let v = read_num_bytes!(T, num_bytes, self.buffer.data()[self.byte_offset..]); self.byte_offset += num_bytes; // Reset buffered_values @@ -657,11 +653,8 @@ impl BitReader { fn reload_buffer_values( self) { let bytes_to_read = cmp::min(self.total_bytes - self.byte_offset, 8); -self.buffered_values = read_num_bytes!( -u64, -bytes_to_read, -self.buffer.start_from(self.byte_offset).as_ref() -); +self.buffered_values = +read_num_bytes!(u64, bytes_to_read, self.buffer.data()[self.byte_offset..]); } }
[arrow] branch master updated: ARROW-10297: [Rust] Parameter for parquet-read to output data in json format, add "cli" feature to parquet crate
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new e0b3c9a ARROW-10297: [Rust] Parameter for parquet-read to output data in json format, add "cli" feature to parquet crate e0b3c9a is described below commit e0b3c9af5f33df948af01e458542a25e73b24b1e Author: Manoj Karthick AuthorDate: Sun Jan 31 12:18:08 2021 -0800 ARROW-10297: [Rust] Parameter for parquet-read to output data in json format, add "cli" feature to parquet crate Add an option to print output in JSON format. in the parquet-read binary. Having json output allows for easy analysis using tools like [jq](https://stedolan.github.io/jq/). This PR builds on the changes implemented in https://github.com/apache/arrow/pull/8686 and incorporates the suggestions in that PR. **Changelog** * Update all three binaries `parquet-schema`, `parquet-rowcount` and `parquet-read` to use [clap](https://github.com/clap-rs/clap) for argument parsing * Add `to_json_value()` method to get `serde_json::Value` from `Row` and `Field` structs (Thanks to @jhorstmann for these changes!) * parquet-schema: * Convert verbose argument into `-v/--verbose` flag * parquet-read: * Add a new flag `-j/--json` that prints the file contents in json lines format * The feature is gated under the `json_output` cargo feature * Update documentation and README with instructions for running * The binaries now use version and author information as defined in Cargo.toml Example output: ``` ❯ parquet-read cities.parquet 3 --json {"continent":"Europe","country":{"name":"France","city":["Paris","Nice","Marseilles","Cannes"]}} {"continent":"Europe","country":{"name":"Greece","city":["Athens","Piraeus","Hania","Heraklion","Rethymnon","Fira"]}} {"continent":"North America","country":{"name":"Canada","city":["Toronto","Vancouver","St. John's","Saint John","Montreal","Halifax","Winnipeg","Calgary","Saskatoon","Ottawa","Yellowknife"]}} ``` Closes #9306 from manojkarthick/rust-parquet-bin-clap Authored-by: Manoj Karthick Signed-off-by: Chao Sun --- rust/parquet/Cargo.toml | 15 +++ rust/parquet/README.md | 15 ++- rust/parquet/src/bin/parquet-read.rs | 75 ++--- rust/parquet/src/bin/parquet-rowcount.rs | 36 -- rust/parquet/src/bin/parquet-schema.rs | 62 +++ rust/parquet/src/record/api.rs | 183 ++- 6 files changed, 322 insertions(+), 64 deletions(-) diff --git a/rust/parquet/Cargo.toml b/rust/parquet/Cargo.toml index 11c35a7..f7be4ee 100644 --- a/rust/parquet/Cargo.toml +++ b/rust/parquet/Cargo.toml @@ -43,6 +43,8 @@ chrono = "0.4" num-bigint = "0.3" arrow = { path = "../arrow", version = "4.0.0-SNAPSHOT", optional = true } base64 = { version = "0.12", optional = true } +clap = { version = "2.33.3", optional = true } +serde_json = { version = "1.0", features = ["preserve_order"], optional = true } [dev-dependencies] rand = "0.8" @@ -56,3 +58,16 @@ serde_json = { version = "1.0", features = ["preserve_order"] } [features] default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"] +cli = ["serde_json", "base64", "clap"] + +[[ bin ]] +name = "parquet-read" +required-features = ["cli"] + +[[ bin ]] +name = "parquet-schema" +required-features = ["cli"] + +[[ bin ]] +name = "parquet-rowcount" +required-features = ["cli"] diff --git a/rust/parquet/README.md b/rust/parquet/README.md index cac8ac4..252312a 100644 --- a/rust/parquet/README.md +++ b/rust/parquet/README.md @@ -79,23 +79,22 @@ enabled by adding `RUSTFLAGS="-C target-feature=+sse4.2"` before the `cargo build` command. ## Test -Run `cargo test` for unit tests. +Run `cargo test` for unit tests. To also run tests related to the binaries, use `cargo test --features cli`. ## Binaries -The following binaries are provided (use `cargo install` to install them): +The following binaries are provided (use `cargo install --features cli` to install them): - **parquet-schema** for printing Parquet f
[arrow] branch master updated (9e73081 -> 25b0b1b)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git. from 9e73081 ARROW-9733: [Rust] [DataFusion] Added support for COUNT/MIN/MAX on string columns add 25b0b1b ARROW-9790: [Rust][Parquet] Fix PrimitiveArrayReader boundary conditions No new revisions were added by this update. Summary of changes: rust/parquet/src/arrow/array_reader.rs | 62 +++--- rust/parquet/src/arrow/arrow_reader.rs | 39 + 2 files changed, 97 insertions(+), 4 deletions(-)
[arrow] branch master updated (d9525f1 -> 2e702f0)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git. from d9525f1 ARROW-9503: [Rust] Comparison sliced arrays is wrong add 2e702f0 ARROW-9461: [Rust] Fixed error in reading Date32 and Date64. No new revisions were added by this update. Summary of changes: rust/parquet/src/arrow/array_reader.rs | 23 --- rust/parquet/src/arrow/converter.rs| 109 +++-- 2 files changed, 59 insertions(+), 73 deletions(-)
[arrow] branch master updated (210d360 -> d9525f1)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git. from 210d360 ARROW-9506: [Packaging][Python] Fix macOS wheel build failures add d9525f1 ARROW-9503: [Rust] Comparison sliced arrays is wrong No new revisions were added by this update. Summary of changes: rust/arrow/src/compute/kernels/comparison.rs | 26 -- 1 file changed, 20 insertions(+), 6 deletions(-)
[arrow] branch master updated (2c49463 -> 7d377ba)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git. from 2c49463 ARROW-8263 [Rust] [DataFusion] Added some documentation to available SQL functions. add 7d377ba ARROW-8559: [Rust] Consolidate Record Batch reader traits in main arrow crate No new revisions were added by this update. Summary of changes: rust/arrow/src/array/builder.rs| 6 +- rust/arrow/src/compute/kernels/arithmetic.rs | 6 +- rust/arrow/src/compute/util.rs | 13 ++- rust/arrow/src/csv/reader.rs | 16 +-- rust/arrow/src/datatypes.rs| 25 +++-- rust/arrow/src/error.rs| 41 --- rust/arrow/src/flight/mod.rs | 5 +- rust/arrow/src/ipc/reader.rs | 88 +++ rust/arrow/src/ipc/writer.rs | 9 +- rust/arrow/src/json/reader.rs | 32 +++--- rust/arrow/src/record_batch.rs | 21 ++-- rust/arrow/src/util/integration_util.rs| 2 +- rust/datafusion/src/datasource/csv.rs | 23 ++-- rust/datafusion/src/datasource/datasource.rs | 10 +- rust/datafusion/src/datasource/memory.rs | 14 +-- rust/datafusion/src/datasource/parquet.rs | 30 +++-- rust/datafusion/src/error.rs | 7 ++ rust/datafusion/src/execution/context.rs | 10 +- .../src/execution/physical_plan/common.rs | 26 +++-- rust/datafusion/src/execution/physical_plan/csv.rs | 31 +++--- .../src/execution/physical_plan/datasource.rs | 23 ++-- .../src/execution/physical_plan/expressions.rs | 2 +- .../src/execution/physical_plan/hash_aggregate.rs | 121 + .../src/execution/physical_plan/limit.rs | 24 ++-- .../src/execution/physical_plan/memory.rs | 29 ++--- .../src/execution/physical_plan/merge.rs | 16 +-- rust/datafusion/src/execution/physical_plan/mod.rs | 18 +-- .../src/execution/physical_plan/parquet.rs | 52 + .../src/execution/physical_plan/projection.rs | 38 --- .../src/execution/physical_plan/selection.rs | 35 +++--- .../datafusion/src/execution/physical_plan/sort.rs | 12 +- rust/datafusion/src/sql/planner.rs | 4 +- rust/datafusion/src/test/mod.rs| 4 +- rust/datafusion/tests/sql.rs | 4 +- .../src/bin/arrow-file-to-stream.rs| 3 +- .../src/bin/arrow-stream-to-file.rs| 3 +- rust/parquet/src/arrow/arrow_reader.rs | 4 +- 37 files changed, 424 insertions(+), 383 deletions(-)
[arrow] branch master updated: ARROW-9280: [Rust] [Parquet] Calculate page and column statistics
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 12b30dd ARROW-9280: [Rust] [Parquet] Calculate page and column statistics 12b30dd is described below commit 12b30dda1a23bad70e5b11b8cef845d0effd01d4 Author: Ze'ev Maor AuthorDate: Thu Jul 2 17:14:16 2020 -0700 ARROW-9280: [Rust] [Parquet] Calculate page and column statistics Allow writer to provide pre-calculated stats Closes #7622 from zeevm/calculate_parquet_statistics Authored-by: Ze'ev Maor Signed-off-by: Chao Sun --- rust/parquet/src/column/writer.rs | 317 +++--- rust/parquet/src/data_type.rs | 53 ++- 2 files changed, 339 insertions(+), 31 deletions(-) diff --git a/rust/parquet/src/column/writer.rs b/rust/parquet/src/column/writer.rs index c54c478..f26c37b 100644 --- a/rust/parquet/src/column/writer.rs +++ b/rust/parquet/src/column/writer.rs @@ -16,23 +16,25 @@ // under the License. //! Contains column writer API. - use std::{cmp, collections::VecDeque, rc::Rc}; use crate::basic::{Compression, Encoding, PageType, Type}; use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter}; use crate::compression::{create_codec, Codec}; +use crate::data_type::AsBytes; use crate::data_type::*; use crate::encodings::{ encoding::{get_encoder, DictEncoder, Encoder}, levels::{max_buffer_size, LevelEncoder}, }; use crate::errors::{ParquetError, Result}; +use crate::file::statistics::Statistics; use crate::file::{ metadata::ColumnChunkMetaData, properties::{WriterProperties, WriterPropertiesPtr, WriterVersion}, }; use crate::schema::types::ColumnDescPtr; +use crate::util::bit_util::FromBytes; use crate::util::memory::{ByteBufferPtr, MemTracker}; /// Column writer for a Parquet type. @@ -47,6 +49,33 @@ pub enum ColumnWriter { FixedLenByteArrayColumnWriter(ColumnWriterImpl), } +pub enum Level { +Page, +Column, +} + +macro_rules! gen_stats_section { +($physical_ty: ty, $stat_fn: ident, $min: ident, $max: ident, $distinct: ident, $nulls: ident) => {{ +let min = $min.as_ref().and_then(|v| { +println!("min: {:?} {}", _bytes(), v.as_bytes().len()); +Some(read_num_bytes!( +$physical_ty, +v.as_bytes().len(), +_bytes() +)) +}); +let max = $max.as_ref().and_then(|v| { +println!("max: {:?} {}", _bytes(), v.as_bytes().len()); +Some(read_num_bytes!( +$physical_ty, +v.as_bytes().len(), +_bytes() +)) +}); +Statistics::$stat_fn(min, max, $distinct, $nulls, false) +}}; +} + /// Gets a specific column writer corresponding to column descriptor `descr`. pub fn get_column_writer( descr: ColumnDescPtr, @@ -149,6 +178,10 @@ pub struct ColumnWriterImpl { num_buffered_values: u32, num_buffered_encoded_values: u32, num_buffered_rows: u32, +min_page_value: Option, +max_page_value: Option, +num_page_nulls: u64, +page_distinct_count: Option, // Metrics per column writer total_bytes_written: u64, total_rows_written: u64, @@ -157,6 +190,10 @@ pub struct ColumnWriterImpl { total_num_values: u64, dictionary_page_offset: Option, data_page_offset: Option, +min_column_value: Option, +max_column_value: Option, +num_column_nulls: u64, +column_distinct_count: Option, // Reused buffers def_levels_sink: Vec, rep_levels_sink: Vec, @@ -216,26 +253,26 @@ impl ColumnWriterImpl { def_levels_sink: vec![], rep_levels_sink: vec![], data_pages: VecDeque::new(), +min_page_value: None, +max_page_value: None, +num_page_nulls: 0, +page_distinct_count: None, +min_column_value: None, +max_column_value: None, +num_column_nulls: 0, +column_distinct_count: None, } } -/// Writes batch of values, definition levels and repetition levels. -/// Returns number of values processed (written). -/// -/// If definition and repetition levels are provided, we write fully those levels and -/// select how many values to write (this number will be returned), since number of -/// actual written values may be smaller than provided values. -/// -/// If only values are provided, then all values are written and the length of -/// of the values buffer is returned. -/// -/// Definition and/or repetition levels can be omitted, if values are -/// non-nullable and/or non-repeated. -pub fn write_batch( +fn write_batch_internal( self,
[arrow] branch master updated: ARROW-9290: [Rust] [Parquet] Add features to allow opting out of dependencies
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 2840e3f ARROW-9290: [Rust] [Parquet] Add features to allow opting out of dependencies 2840e3f is described below commit 2840e3f01be65676a511d7eb25b48e46e358ecef Author: Ben Kimock AuthorDate: Thu Jul 2 15:55:59 2020 -0700 ARROW-9290: [Rust] [Parquet] Add features to allow opting out of dependencies Closes #7610 from saethlin/rust-dep-slimming Authored-by: Ben Kimock Signed-off-by: Chao Sun --- rust/parquet/Cargo.toml | 24 ++- rust/parquet/src/compression.rs | 353 +++- rust/parquet/src/errors.rs | 120 +- rust/parquet/src/lib.rs | 1 + 4 files changed, 300 insertions(+), 198 deletions(-) diff --git a/rust/parquet/Cargo.toml b/rust/parquet/Cargo.toml index c3b6d0b..68a92aa 100644 --- a/rust/parquet/Cargo.toml +++ b/rust/parquet/Cargo.toml @@ -29,20 +29,28 @@ build = "build.rs" edition = "2018" [dependencies] -parquet-format = "~2.6" -quick-error = "1.2" +parquet-format = "2.6.1" byteorder = "1" thrift = "0.13" -snap = "1.0" -brotli = "3.3" -flate2 = "1.0" -lz4 = "1.23" -zstd = "0.5" +snap = { version = "1.0", optional = true } +brotli = { version = "3.3", optional = true } +flate2 = { version = "1.0", optional = true } +lz4 = { version = "1.23", optional = true } +zstd = { version = "0.5", optional = true } chrono = "0.4" num-bigint = "0.2" -arrow = { path = "../arrow", version = "1.0.0-SNAPSHOT" } +arrow = { path = "../arrow", version = "1.0.0-SNAPSHOT", optional = true } serde_json = { version = "1.0", features = ["preserve_order"] } [dev-dependencies] lazy_static = "1" rand = "0.6" +snap = "1.0" +brotli = "3.3" +flate2 = "1.0" +lz4 = "1.23" +zstd = "0.5" +arrow = { path = "../arrow", version = "1.0.0-SNAPSHOT" } + +[features] +default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd"] diff --git a/rust/parquet/src/compression.rs b/rust/parquet/src/compression.rs index 871bac1..9b4ac71 100644 --- a/rust/parquet/src/compression.rs +++ b/rust/parquet/src/compression.rs @@ -22,7 +22,7 @@ //! //! # Example //! -//! ```rust +//! ```no_run //! use parquet::{basic::Compression, compression::create_codec}; //! //! let mut codec = match create_codec(Compression::SNAPPY) { @@ -40,14 +40,6 @@ //! assert_eq!(output, data); //! ``` -use std::io::{self, Read, Write}; - -use brotli; -use flate2::{read, write, Compression}; -use lz4; -use snap::raw::{decompress_len, max_compress_len, Decoder, Encoder}; -use zstd; - use crate::basic::Compression as CodecType; use crate::errors::{ParquetError, Result}; @@ -70,202 +62,261 @@ pub trait Codec { /// This returns `None` if the codec type is `UNCOMPRESSED`. pub fn create_codec(codec: CodecType) -> Result>> { match codec { +#[cfg(any(feature = "brotli", test))] CodecType::BROTLI => Ok(Some(Box::new(BrotliCodec::new(, +#[cfg(any(feature = "flate2", test))] CodecType::GZIP => Ok(Some(Box::new(GZipCodec::new(, +#[cfg(any(feature = "snap", test))] CodecType::SNAPPY => Ok(Some(Box::new(SnappyCodec::new(, +#[cfg(any(feature = "lz4", test))] CodecType::LZ4 => Ok(Some(Box::new(LZ4Codec::new(, +#[cfg(any(feature = "zstd", test))] CodecType::ZSTD => Ok(Some(Box::new(ZSTDCodec::new(, CodecType::UNCOMPRESSED => Ok(None), _ => Err(nyi_err!("The codec type {} is not supported yet", codec)), } } -/// Codec for Snappy compression format. -pub struct SnappyCodec { -decoder: Decoder, -encoder: Encoder, -} +#[cfg(any(feature = "snap", test))] +mod snappy_codec { +use snap::raw::{decompress_len, max_compress_len, Decoder, Encoder}; -impl SnappyCodec { -/// Creates new Snappy compression codec. -fn new() -> Self { -Self { -decoder: Decoder::new(), -encoder: Encoder::new(), -} -} -} +use crate::compression::Codec; +use crate::errors::Result; -impl Codec for SnappyCodec { -fn decompress( - self, -input_buf: &[u8], -output_buf: Vec, -) -> Result { -let len = decompress_len(input_buf)?; -output_buf.resize(len, 0); -self.decoder -.decompr
[arrow] branch master updated (51108e3 -> 44e723d)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git. from 51108e3 ARROW-6111: [Java] Support LargeVarChar and LargeBinary types add 44e723d ARROW-8455: [Rust] Parquet Arrow column read on partially compatible files FIX No new revisions were added by this update. Summary of changes: rust/parquet/src/arrow/array_reader.rs | 19 +-- rust/parquet/src/arrow/arrow_reader.rs | 100 - 2 files changed, 89 insertions(+), 30 deletions(-)
[arrow] branch master updated (bb2ae8f -> 61bcf62)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git. from bb2ae8f ARROW-8889: [Python] avoid SIGSEGV when comparing RecordBatch to None add 61bcf62 ARROW-8455: [Rust] Parquet Arrow column read on partially compatible files No new revisions were added by this update. Summary of changes: rust/parquet/src/arrow/array_reader.rs | 18 -- rust/parquet/src/column/reader.rs | 15 ++- 2 files changed, 18 insertions(+), 15 deletions(-)
[arrow] branch master updated (46040c9 -> 021ff20)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git. from 46040c9 ARROW-8680: [Rust] Fix ComplexObjectArray null value shifting add 021ff20 ARROW-8752: [Rust] remove unused hashmaps in build_array_reader No new revisions were added by this update. Summary of changes: rust/parquet/src/arrow/array_reader.rs | 11 +-- 1 file changed, 1 insertion(+), 10 deletions(-)
[arrow] branch master updated (cd872b4 -> 46040c9)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git. from cd872b4 ARROW-8768: [R][CI] Fix nightly as-cran spurious failure add 46040c9 ARROW-8680: [Rust] Fix ComplexObjectArray null value shifting No new revisions were added by this update. Summary of changes: rust/parquet/src/arrow/array_reader.rs | 142 +++-- 1 file changed, 134 insertions(+), 8 deletions(-)
[arrow] branch master updated (8a791c5 -> 8f64211)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git. from 8a791c5 ARROW-8562: [C++] IO: Parameterize I/O Coalescing using S3 metrics add 8f64211 ARROW-8659: [Rust] ListBuilder allocate with_capacity No new revisions were added by this update. Summary of changes: rust/arrow/src/array/builder.rs | 40 ++--- rust/parquet/src/arrow/converter.rs | 7 ++- 2 files changed, 39 insertions(+), 8 deletions(-)