[GitHub] [arrow] cyb70289 commented on a change in pull request #9604: ARROW-11567: [C++][Compute] Improve variance kernel precision
cyb70289 commented on a change in pull request #9604: URL: https://github.com/apache/arrow/pull/9604#discussion_r585317366 ## File path: cpp/src/arrow/compute/kernels/aggregate_var_std.cc ## @@ -30,6 +30,96 @@ namespace internal { namespace { using arrow::internal::int128_t; +using arrow::internal::VisitSetBitRunsVoid; + +// non-recursive pairwise summation for floating points +// https://en.wikipedia.org/wiki/Pairwise_summation +template +enable_if_t::value, SumType> SumArray( +const ArrayData& data, ValueFunc&& func) { + const int64_t data_size = data.length - data.GetNullCount(); + if (data_size == 0) { +return 0; + } + + // number of inputs to accumulate before merging with another block + constexpr int kBlockSize = 16; // same as numpy + // levels (tree depth) = ceil(log2(len)) + 1, a bit larger than necessary + const int levels = BitUtil::Log2(static_cast(data_size)) + 1; + // temporary summation per level + std::vector sum(levels); + // whether two summations are ready and should be reduced to upper level + // one bit for each level, bit0 -> level0, ... + uint64_t mask = 0; + // level of root node holding the final summation + int root_level = 0; + + // reduce summation of one block (may be smaller than kBlockSize) from leaf node + // continue reducing to upper level if two summations are ready for non-leaf node + auto reduce = [&](SumType block_sum) { +int cur_level = 0; +uint64_t cur_level_mask = 1ULL; +sum[cur_level] += block_sum; +mask ^= cur_level_mask; +while ((mask & cur_level_mask) == 0) { + block_sum = sum[cur_level]; + sum[cur_level] = 0; + ++cur_level; + DCHECK_LT(cur_level, levels); + cur_level_mask <<= 1; + sum[cur_level] += block_sum; + mask ^= cur_level_mask; +} +root_level = std::max(root_level, cur_level); + }; + + const ValueType* values = data.GetValues(1); + VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length, + [&](int64_t pos, int64_t len) { +const ValueType* v = [pos]; +const int64_t blocks = len / kBlockSize; +const int64_t remains = len % kBlockSize; Review comment: Changed to unsigned division. Not using shift/mask as kBlockSize may be not 2^n. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] cyb70289 commented on a change in pull request #9604: ARROW-11567: [C++][Compute] Improve variance kernel precision
cyb70289 commented on a change in pull request #9604: URL: https://github.com/apache/arrow/pull/9604#discussion_r585316069 ## File path: cpp/src/arrow/compute/kernels/aggregate_test.cc ## @@ -1205,6 +1205,21 @@ TEST_F(TestVarStdKernelMergeStability, Basics) { #endif } +// Test round-off error +class TestVarStdKernelRoundOff : public TestPrimitiveVarStdKernel {}; + +TEST_F(TestVarStdKernelRoundOff, Basics) { + // build array: np.arange(321000, dtype='float64') + double value = 0; + ASSERT_OK_AND_ASSIGN( + auto array, ArrayFromBuilderVisitor(float64(), 321000, [&](DoubleBuilder* builder) { +builder->UnsafeAppend(value++); + })); + + // reference value from numpy.var() + this->AssertVarStdIs(*array, VarianceOptions{0}, 858674.916667); +} Review comment: Hmm... some difference from numpy. Our variance kernel always returns `double` for any input type. Numpy can select output dtype, by default, it returns `float32` for `float32`, `double` for any other types. https://numpy.org/doc/stable/reference/generated/numpy.var.html I would prefer always returning `double`. It's simpler and looks more reasonable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r585290009 ## File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java ## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; +import org.apache.commons.compress.utils.IOUtils; + +import io.netty.util.internal.PlatformDependent; + +/** + * Compression codec for the LZ4 algorithm. + */ +public class Lz4CompressionCodec implements CompressionCodec { + + @Override + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { +Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The uncompressed buffer size exceeds the integer limit"); + +if (uncompressedBuffer.writerIndex() == 0L) { + // shortcut for empty buffer + ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + compressedBuffer.setLong(0, 0); + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + uncompressedBuffer.close(); + return compressedBuffer; +} + +try { + ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer); + long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH; + if (compressedLength > uncompressedBuffer.writerIndex()) { +// compressed buffer is larger, send the raw buffer +compressedBuffer.close(); +compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer); + } + + uncompressedBuffer.close(); + return compressedBuffer; +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException { +byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()]; +PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex()); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +try (InputStream in = new ByteArrayInputStream(inBytes); + OutputStream out = new FramedLZ4CompressorOutputStream(baos)) { + IOUtils.copy(in, out); +} + +byte[] outBytes = baos.toByteArray(); + +ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + +long uncompressedLength = uncompressedBuffer.writerIndex(); +if (!MemoryUtil.LITTLE_ENDIAN) { + uncompressedLength = Long.reverseBytes(uncompressedLength); +} +// first 8 bytes reserved for uncompressed length, to be consistent with the +// C++ implementation. +compressedBuffer.setLong(0, uncompressedLength); + +PlatformDependent.copyMemory( +outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length); +compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); +return compressedBuffer; + } + + @Override + public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { +Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The compressed buffer size exceeds the integer limit"); + +Preconditions.checkArgument(compressedBuffer.writerIndex() >=
[GitHub] [arrow] codecov-io edited a comment on pull request #9592: ARROW-11803: [Rust] [Parquet] Support v2 LogicalType
codecov-io edited a comment on pull request #9592: URL: https://github.com/apache/arrow/pull/9592#issuecomment-787041283 # [Codecov](https://codecov.io/gh/apache/arrow/pull/9592?src=pr=h1) Report > Merging [#9592](https://codecov.io/gh/apache/arrow/pull/9592?src=pr=desc) (132949a) into [master](https://codecov.io/gh/apache/arrow/commit/35daca2ef999ab05e801e9ec9aad977115b61231?el=desc) (35daca2) will **decrease** coverage by `0.07%`. > The diff coverage is `81.28%`. [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/9592/graphs/tree.svg?width=650=150=pr=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/9592?src=pr=tree) ```diff @@Coverage Diff @@ ## master#9592 +/- ## == - Coverage 82.32% 82.25% -0.08% == Files 245 245 Lines 5641256797 +385 == + Hits4644246719 +277 - Misses 997010078 +108 ``` | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/9592?src=pr=tree) | Coverage Δ | | |---|---|---| | [rust/benchmarks/src/bin/tpch.rs](https://codecov.io/gh/apache/arrow/pull/9592/diff?src=pr=tree#diff-cnVzdC9iZW5jaG1hcmtzL3NyYy9iaW4vdHBjaC5ycw==) | `38.33% <ø> (ø)` | | | [rust/arrow/src/datatypes/field.rs](https://codecov.io/gh/apache/arrow/pull/9592/diff?src=pr=tree#diff-cnVzdC9hcnJvdy9zcmMvZGF0YXR5cGVzL2ZpZWxkLnJz) | `55.47% <47.61%> (-0.66%)` | :arrow_down: | | [rust/parquet/src/schema/visitor.rs](https://codecov.io/gh/apache/arrow/pull/9592/diff?src=pr=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9zY2hlbWEvdmlzaXRvci5ycw==) | `68.42% <66.66%> (ø)` | | | [rust/parquet/src/arrow/array\_reader.rs](https://codecov.io/gh/apache/arrow/pull/9592/diff?src=pr=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9hcnJvdy9hcnJheV9yZWFkZXIucnM=) | `77.53% <75.00%> (-0.08%)` | :arrow_down: | | [rust/parquet/src/record/reader.rs](https://codecov.io/gh/apache/arrow/pull/9592/diff?src=pr=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9yZWNvcmQvcmVhZGVyLnJz) | `91.09% <75.00%> (ø)` | | | [rust/parquet/src/basic.rs](https://codecov.io/gh/apache/arrow/pull/9592/diff?src=pr=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9iYXNpYy5ycw==) | `87.22% <77.28%> (-10.04%)` | :arrow_down: | | [rust/parquet/src/schema/types.rs](https://codecov.io/gh/apache/arrow/pull/9592/diff?src=pr=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9zY2hlbWEvdHlwZXMucnM=) | `89.79% <78.48%> (+0.25%)` | :arrow_up: | | [rust/arrow/src/datatypes/schema.rs](https://codecov.io/gh/apache/arrow/pull/9592/diff?src=pr=tree#diff-cnVzdC9hcnJvdy9zcmMvZGF0YXR5cGVzL3NjaGVtYS5ycw==) | `73.39% <78.57%> (+0.76%)` | :arrow_up: | | [rust/parquet/src/arrow/schema.rs](https://codecov.io/gh/apache/arrow/pull/9592/diff?src=pr=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9hcnJvdy9zY2hlbWEucnM=) | `92.36% <87.23%> (ø)` | | | [rust/parquet/src/schema/parser.rs](https://codecov.io/gh/apache/arrow/pull/9592/diff?src=pr=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9zY2hlbWEvcGFyc2VyLnJz) | `90.20% <88.88%> (ø)` | | | ... and [9 more](https://codecov.io/gh/apache/arrow/pull/9592/diff?src=pr=tree-more) | | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/9592?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/9592?src=pr=footer). Last update [7184c3f...132949a](https://codecov.io/gh/apache/arrow/pull/9592?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] JkSelf commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
JkSelf commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r585252853 ## File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java ## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; +import org.apache.commons.compress.utils.IOUtils; + +import io.netty.util.internal.PlatformDependent; + +/** + * Compression codec for the LZ4 algorithm. + */ +public class Lz4CompressionCodec implements CompressionCodec { + + @Override + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { +Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The uncompressed buffer size exceeds the integer limit"); + +if (uncompressedBuffer.writerIndex() == 0L) { + // shortcut for empty buffer + ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + compressedBuffer.setLong(0, 0); + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + uncompressedBuffer.close(); + return compressedBuffer; +} + +try { + ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer); + long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH; + if (compressedLength > uncompressedBuffer.writerIndex()) { +// compressed buffer is larger, send the raw buffer +compressedBuffer.close(); +compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer); + } + + uncompressedBuffer.close(); + return compressedBuffer; +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException { +byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()]; +PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex()); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +try (InputStream in = new ByteArrayInputStream(inBytes); + OutputStream out = new FramedLZ4CompressorOutputStream(baos)) { + IOUtils.copy(in, out); +} + +byte[] outBytes = baos.toByteArray(); + +ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + +long uncompressedLength = uncompressedBuffer.writerIndex(); +if (!MemoryUtil.LITTLE_ENDIAN) { + uncompressedLength = Long.reverseBytes(uncompressedLength); +} +// first 8 bytes reserved for uncompressed length, to be consistent with the +// C++ implementation. +compressedBuffer.setLong(0, uncompressedLength); + +PlatformDependent.copyMemory( +outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length); +compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); +return compressedBuffer; + } + + @Override + public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { +Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The compressed buffer size exceeds the integer limit"); + +Preconditions.checkArgument(compressedBuffer.writerIndex() >=
[GitHub] [arrow] nevi-me commented on a change in pull request #9592: ARROW-11803: [Rust] [Parquet] Support v2 LogicalType
nevi-me commented on a change in pull request #9592: URL: https://github.com/apache/arrow/pull/9592#discussion_r585250535 ## File path: rust/parquet/src/schema/types.rs ## @@ -972,18 +1011,22 @@ fn from_thrift_helper( } /// Method to convert to Thrift. -pub fn to_thrift(schema: ) -> Result> { +pub fn to_thrift(schema: , writer_version: i32) -> Result> { Review comment: I see what you mean @sunchao. I've removed the version check, and always write the logical type. I suppose I'm not thinking of this well from a compatibility perspective. We'll always want to write complying with whatever `parquet-format` version we're using. There's still something that's unclear to me about how we'll deal with the text schema format, but I can raise the questions when I work on its relevant PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] github-actions[bot] commented on pull request #9611: ARROW-11833: [C++] Fix missing architecture flag for vendored fast_float
github-actions[bot] commented on pull request #9611: URL: https://github.com/apache/arrow/pull/9611#issuecomment-788558370 https://issues.apache.org/jira/browse/ARROW-11833 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] timkpaine opened a new pull request #9611: ARROW-11833: [C++] Fix missing architecture flag for vendored fast_float
timkpaine opened a new pull request #9611: URL: https://github.com/apache/arrow/pull/9611 Please don't merge until upstream https://github.com/fastfloat/fast_float/pull/63 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] sundy-li edited a comment on pull request #9602: ARROW-11630: [Rust] Introduce limit option for sort kernel
sundy-li edited a comment on pull request #9602: URL: https://github.com/apache/arrow/pull/9602#issuecomment-788550187 I removed `pdqsort`, because the performance didn't show any improvement during the benches in my pc. Added a separate function `partial_sort `as @jorgecarleitao suggested. I did not find any other `partial_sort` function in Rust like std::partial_sort in c++. So I created [one](https://github.com/sundy-li/partial_sort). And the [benche results about partial_sort in arrow](https://github.com/apache/arrow/blob/dc3167e3826177af1a6feec516a528a4ee38c674/rust/arrow/benches/sort_kernel.rs#L76-L106) are: ``` sort 2^12 time: [753.58 us 755.43 us 758.14 us] sort nulls 2^12 time: [633.41 us 635.28 us 637.51 us] sort 2^12 limit 10time: [49.246 us 49.820 us 50.667 us] sort 2^12 limit 100 time: [115.11 us 116.26 us 117.76 us] sort 2^12 limit 1000 time: [645.91 us 654.36 us 663.78 us] sort nulls 2^12 limit 10 time: [66.283 us 66.725 us 67.347 us] sort nulls 2^12 limit 100 time: [76.281 us 77.907 us 79.783 us] sort nulls 2^12 limit 1000time: [258.98 us 260.32 us 262.24 us] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] sundy-li edited a comment on pull request #9602: ARROW-11630: [Rust] Introduce limit option for sort kernel
sundy-li edited a comment on pull request #9602: URL: https://github.com/apache/arrow/pull/9602#issuecomment-788550187 I removed `pdqsort`, because the performance didn't show any improvement during the benches in my pc. Added a separate function `partial_sort `as @jorgecarleitao suggested. I did not find any other `partial_sort` function in Rust like std::partial_sort in c++. So I created [one](https://github.com/sundy-li/partial_sort). And the [benche results about partial_sort in arrow](https://github.com/apache/arrow/blob/f19fc1644a07cefcf584d450ad70c5708926c252/rust/arrow/benches/sort_kernel.rs#L76-L106) are: ``` sort 2^12 time: [753.58 us 755.43 us 758.14 us] sort nulls 2^12 time: [633.41 us 635.28 us 637.51 us] sort 2^12 limit 10time: [49.246 us 49.820 us 50.667 us] sort 2^12 limit 100 time: [115.11 us 116.26 us 117.76 us] sort 2^12 limit 1000 time: [645.91 us 654.36 us 663.78 us] sort nulls 2^12 limit 10 time: [66.283 us 66.725 us 67.347 us] sort nulls 2^12 limit 100 time: [76.281 us 77.907 us 79.783 us] sort nulls 2^12 limit 1000time: [258.98 us 260.32 us 262.24 us] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] sundy-li edited a comment on pull request #9602: ARROW-11630: [Rust] Introduce limit option for sort kernel
sundy-li edited a comment on pull request #9602: URL: https://github.com/apache/arrow/pull/9602#issuecomment-788550187 I removed `pdqsort`, because the performance didn't show any improvement during the benches in my pc. Added a separate function `partial_sort `as @jorgecarleitao suggested. I did not find any other `partial_sort` function in Rust like std::partial_sort in c++. So I created [one](https://github.com/sundy-li/partial_sort). And the [benche results about partial_sort in arrow](https://github.com/apache/arrow/blob/f19fc1644a07cefcf584d450ad70c5708926c252/rust/arrow/benches/sort_kernel.rs#L76-L106) are: ``` sort 2^12 time: [753.58 us 755.43 us 758.14 us] sort nulls 2^12 time: [633.41 us 635.28 us 637.51 us] sort 2^12 limit 10time: [49.246 us 49.820 us 50.667 us] sort 2^12 limit 100 time: [115.11 us 116.26 us 117.76 us] sort 2^12 limit 1000 time: [113.61 us 114.21 us 114.85 us] sort nulls 2^12 limit 10 time: [66.283 us 66.725 us 67.347 us] sort nulls 2^12 limit 100 time: [66.455 us 66.932 us 67.504 us] sort nulls 2^12 limit 1000time: [65.030 us 65.218 us 65.412 us] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] sundy-li edited a comment on pull request #9602: ARROW-11630: [Rust] Introduce limit option for sort kernel
sundy-li edited a comment on pull request #9602: URL: https://github.com/apache/arrow/pull/9602#issuecomment-788550187 I removed `pdqsort`, because the performance didn't show any improvement during the benches. Added a separate function `partial_sort `as @jorgecarleitao suggested. I did not find any other `partial_sort` function in Rust like std::partial_sort in c++. So I created [one](https://github.com/sundy-li/partial_sort). And the [benche results about partial_sort in arrow](https://github.com/apache/arrow/blob/f19fc1644a07cefcf584d450ad70c5708926c252/rust/arrow/benches/sort_kernel.rs#L76-L106) are: ``` sort 2^12 time: [753.58 us 755.43 us 758.14 us] sort nulls 2^12 time: [633.41 us 635.28 us 637.51 us] sort 2^12 limit 10time: [49.246 us 49.820 us 50.667 us] sort 2^12 limit 100 time: [115.11 us 116.26 us 117.76 us] sort 2^12 limit 1000 time: [113.61 us 114.21 us 114.85 us] sort nulls 2^12 limit 10 time: [66.283 us 66.725 us 67.347 us] sort nulls 2^12 limit 100 time: [66.455 us 66.932 us 67.504 us] sort nulls 2^12 limit 1000time: [65.030 us 65.218 us 65.412 us] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] sundy-li edited a comment on pull request #9602: ARROW-11630: [Rust] Introduce limit option for sort kernel
sundy-li edited a comment on pull request #9602: URL: https://github.com/apache/arrow/pull/9602#issuecomment-788550187 I removed `pdqsort`, because the performance didn't show any improvement during the benches. Added a separate function `partial_sort `as @jorgecarleitao suggested. I did not find any other `partial_sort` function in Rust like std::partial_sort in c++. So I created [one](https://github.com/sundy-li/partial_sort). And the [benche results about partial_sort in arrow](https://github.com/apache/arrow/blob/f19fc1644a07cefcf584d450ad70c5708926c252/rust/arrow/benches/sort_kernel.rs#L76-L106) are: ``` sort 2^10 time: [156.26 us 156.97 us 157.88 us] sort 2^12 time: [753.58 us 755.43 us 758.14 us] sort nulls 2^10 time: [133.73 us 134.42 us 135.27 us] sort nulls 2^12 time: [633.41 us 635.28 us 637.51 us] sort 2^12 limit 10time: [49.246 us 49.820 us 50.667 us] sort 2^12 limit 100 time: [115.11 us 116.26 us 117.76 us] sort 2^12 limit 1000 time: [113.61 us 114.21 us 114.85 us] sort nulls 2^12 limit 10 time: [66.283 us 66.725 us 67.347 us] sort nulls 2^12 limit 100 time: [66.455 us 66.932 us 67.504 us] sort nulls 2^12 limit 1000time: [65.030 us 65.218 us 65.412 us] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] sundy-li commented on pull request #9602: ARROW-11630: [Rust] Introduce limit option for sort kernel
sundy-li commented on pull request #9602: URL: https://github.com/apache/arrow/pull/9602#issuecomment-788550187 I removed `pdqsort`, because the performance didn't show any improvement during the benches. Added a separate function `partial_sort `as @jorgecarleitao suggested. I did not find any other `partial_sort` function in Rust like std::partial_sort in c++. So I created [one](https://github.com/sundy-li/partial_sort). And the benche results about partial_sort in arrow are: ``` sort 2^10 time: [156.26 us 156.97 us 157.88 us] sort 2^12 time: [753.58 us 755.43 us 758.14 us] sort nulls 2^10 time: [133.73 us 134.42 us 135.27 us] sort nulls 2^12 time: [633.41 us 635.28 us 637.51 us] sort 2^12 limit 10time: [49.246 us 49.820 us 50.667 us] sort 2^12 limit 100 time: [115.11 us 116.26 us 117.76 us] sort 2^12 limit 1000 time: [113.61 us 114.21 us 114.85 us] sort nulls 2^12 limit 10 time: [66.283 us 66.725 us 67.347 us] sort nulls 2^12 limit 100 time: [66.455 us 66.932 us 67.504 us] sort nulls 2^12 limit 1000time: [65.030 us 65.218 us 65.412 us] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] sundy-li commented on a change in pull request #9602: ARROW-11630: [Rust] Introduce limit option for sort kernel
sundy-li commented on a change in pull request #9602: URL: https://github.com/apache/arrow/pull/9602#discussion_r585214440 ## File path: rust/arrow/src/compute/kernels/sort.rs ## @@ -36,8 +36,12 @@ use TimeUnit::*; /// /// Returns an `ArrowError::ComputeError(String)` if the array type is either unsupported by `sort_to_indices` or `take`. /// -pub fn sort(values: , options: Option) -> Result { -let indices = sort_to_indices(values, options)?; +pub fn sort( +values: , +options: Option, +limit: Option, Review comment: Ok, I will introduce `partial_sort` with `limit` parameter. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r585195875 ## File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java ## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; +import org.apache.commons.compress.utils.IOUtils; + +import io.netty.util.internal.PlatformDependent; + +/** + * Compression codec for the LZ4 algorithm. + */ +public class Lz4CompressionCodec implements CompressionCodec { + + @Override + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { +Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The uncompressed buffer size exceeds the integer limit"); + +if (uncompressedBuffer.writerIndex() == 0L) { + // shortcut for empty buffer + ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + compressedBuffer.setLong(0, 0); + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + uncompressedBuffer.close(); + return compressedBuffer; +} + +try { + ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer); + long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH; + if (compressedLength > uncompressedBuffer.writerIndex()) { +// compressed buffer is larger, send the raw buffer +compressedBuffer.close(); +compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer); + } + + uncompressedBuffer.close(); + return compressedBuffer; +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException { +byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()]; +PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex()); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +try (InputStream in = new ByteArrayInputStream(inBytes); + OutputStream out = new FramedLZ4CompressorOutputStream(baos)) { + IOUtils.copy(in, out); +} + +byte[] outBytes = baos.toByteArray(); + +ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + +long uncompressedLength = uncompressedBuffer.writerIndex(); +if (!MemoryUtil.LITTLE_ENDIAN) { + uncompressedLength = Long.reverseBytes(uncompressedLength); +} +// first 8 bytes reserved for uncompressed length, to be consistent with the +// C++ implementation. +compressedBuffer.setLong(0, uncompressedLength); + +PlatformDependent.copyMemory( +outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length); +compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); +return compressedBuffer; + } + + @Override + public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { +Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The compressed buffer size exceeds the integer limit"); + +Preconditions.checkArgument(compressedBuffer.writerIndex() >=
[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r585195604 ## File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java ## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; +import org.apache.commons.compress.utils.IOUtils; + +import io.netty.util.internal.PlatformDependent; + +/** + * Compression codec for the LZ4 algorithm. + */ +public class Lz4CompressionCodec implements CompressionCodec { + + @Override + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { +Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The uncompressed buffer size exceeds the integer limit"); + +if (uncompressedBuffer.writerIndex() == 0L) { + // shortcut for empty buffer + ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + compressedBuffer.setLong(0, 0); + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + uncompressedBuffer.close(); + return compressedBuffer; +} + +try { + ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer); + long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH; + if (compressedLength > uncompressedBuffer.writerIndex()) { +// compressed buffer is larger, send the raw buffer +compressedBuffer.close(); +compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer); + } + + uncompressedBuffer.close(); + return compressedBuffer; +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException { +byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()]; +PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex()); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +try (InputStream in = new ByteArrayInputStream(inBytes); + OutputStream out = new FramedLZ4CompressorOutputStream(baos)) { + IOUtils.copy(in, out); +} + +byte[] outBytes = baos.toByteArray(); + +ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + +long uncompressedLength = uncompressedBuffer.writerIndex(); +if (!MemoryUtil.LITTLE_ENDIAN) { + uncompressedLength = Long.reverseBytes(uncompressedLength); +} +// first 8 bytes reserved for uncompressed length, to be consistent with the +// C++ implementation. +compressedBuffer.setLong(0, uncompressedLength); + +PlatformDependent.copyMemory( +outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length); +compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); +return compressedBuffer; + } + + @Override + public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { +Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The compressed buffer size exceeds the integer limit"); + +Preconditions.checkArgument(compressedBuffer.writerIndex() >=
[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r585195259 ## File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java ## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; +import org.apache.commons.compress.utils.IOUtils; + +import io.netty.util.internal.PlatformDependent; + +/** + * Compression codec for the LZ4 algorithm. + */ +public class Lz4CompressionCodec implements CompressionCodec { + + @Override + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { +Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The uncompressed buffer size exceeds the integer limit"); + +if (uncompressedBuffer.writerIndex() == 0L) { + // shortcut for empty buffer + ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + compressedBuffer.setLong(0, 0); + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + uncompressedBuffer.close(); + return compressedBuffer; +} + +try { + ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer); + long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH; + if (compressedLength > uncompressedBuffer.writerIndex()) { +// compressed buffer is larger, send the raw buffer +compressedBuffer.close(); +compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer); + } + + uncompressedBuffer.close(); + return compressedBuffer; +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException { +byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()]; +PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex()); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +try (InputStream in = new ByteArrayInputStream(inBytes); + OutputStream out = new FramedLZ4CompressorOutputStream(baos)) { + IOUtils.copy(in, out); +} + +byte[] outBytes = baos.toByteArray(); + +ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + +long uncompressedLength = uncompressedBuffer.writerIndex(); +if (!MemoryUtil.LITTLE_ENDIAN) { + uncompressedLength = Long.reverseBytes(uncompressedLength); +} +// first 8 bytes reserved for uncompressed length, to be consistent with the +// C++ implementation. +compressedBuffer.setLong(0, uncompressedLength); + +PlatformDependent.copyMemory( +outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length); +compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); +return compressedBuffer; + } + + @Override + public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { +Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The compressed buffer size exceeds the integer limit"); + +Preconditions.checkArgument(compressedBuffer.writerIndex() >=
[GitHub] [arrow] liyafan82 commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on pull request #8949: URL: https://github.com/apache/arrow/pull/8949#issuecomment-788520085 > @liyafan82 Thanks for your great working! I have cherry pick your code in my project to enable the LZ4 compress. And I encountered the following two bugs. Looking forward your response. Thanks. @JkSelf Thanks a lot for your effort and feedback. This PR is still under devlopment, and some problems are not resolved yet (you may observe that the integration tests are still breaking.) I will try to resolve the problems in a few days. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] sundy-li commented on pull request #9596: ARROW-11495: [Rust] Better numerical_coercion
sundy-li commented on pull request #9596: URL: https://github.com/apache/arrow/pull/9596#issuecomment-788519719 > Also for memory/performance concerns - I think it makes sense to think about those as well. We will do some extra copies by casting, so `u8 + u8` will be converted if I understand correctly to `c(u8, u16) + c(u8, u16)`, but `u8 + u8 + u8` will involve more copies to something like `c(c(u8, u16)+ c(u8, u16), u32) + c(u8, u32)` etc. as the subexpressions are coerced to u16 already first. This could be optimized out a bit maybe to avoid the double casting, but it shows it can be quite inefficient. > > Also when doing IO and keeping the result in memory will result in more storage costs, longer latency and/or higher memory usage if you don't convert them to use smaller types again. Yes, it is. But correctness is important than efficiency. So it may be worth doing that. I'm a fan of ClickHouse, ClickHouse always pursues the ultimate performance, but it had to do that too. About behavior in mysql: ``` mysql> select 3 / 2, 2 - 3, 127 + 3, 255 + 8; +--+-+--+--+ | divide(3, 2) | minus(2, 3) | plus(127, 3) | plus(255, 8) | +--+-+--+--+ | 1.5 | -1 | 130 | 263 | +--+-+--+--+ 1 row in set (0.01 sec) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] ianmcook commented on a change in pull request #9610: ARROW-11735: [R] Allow Parquet and Arrow Dataset to be optional components
ianmcook commented on a change in pull request #9610: URL: https://github.com/apache/arrow/pull/9610#discussion_r585150504 ## File path: r/tests/testthat/latin1.R ## @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -x <- iconv("Veitingasta�ir", to = "latin1") +x <- iconv("Veitingasta�ir", to = "latin1") Review comment: cmon VS Code! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] nealrichardson commented on a change in pull request #9610: ARROW-11735: [R] Allow Parquet and Arrow Dataset to be optional components
nealrichardson commented on a change in pull request #9610: URL: https://github.com/apache/arrow/pull/9610#discussion_r585149473 ## File path: r/tests/testthat/latin1.R ## @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -x <- iconv("Veitingasta�ir", to = "latin1") +x <- iconv("Veitingasta�ir", to = "latin1") Review comment: Yeah you need to open this in a text editor that won't reencode to UTF-8 (`vim` may be suited for this), or otherwise set the encoding in the editor to latin1 (or whatever its official name is). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] ianmcook commented on a change in pull request #9610: ARROW-11735: [R] Allow Parquet and Arrow Dataset to be optional components
ianmcook commented on a change in pull request #9610: URL: https://github.com/apache/arrow/pull/9610#discussion_r585147423 ## File path: r/tests/testthat/latin1.R ## @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -x <- iconv("Veitingasta�ir", to = "latin1") +x <- iconv("Veitingasta�ir", to = "latin1") Review comment: Whoa, no This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] ianmcook commented on a change in pull request #9610: ARROW-11735: [R] Allow Parquet and Arrow Dataset to be optional components
ianmcook commented on a change in pull request #9610: URL: https://github.com/apache/arrow/pull/9610#discussion_r585147323 ## File path: r/tests/testthat/test-python-flight.R ## @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. +skip("Temporarily skip this") Review comment: Ah shoot, sorry, I wasn't supposed to commit that This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] nealrichardson commented on pull request #8650: ARROW-10570: [R] Use Converter API to convert SEXP to Array/ChunkedArray
nealrichardson commented on pull request #8650: URL: https://github.com/apache/arrow/pull/8650#issuecomment-788416300 Followup is ARROW-11832. Thanks @jonkeane for digging! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] nealrichardson closed pull request #8650: ARROW-10570: [R] Use Converter API to convert SEXP to Array/ChunkedArray
nealrichardson closed pull request #8650: URL: https://github.com/apache/arrow/pull/8650 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] nealrichardson commented on a change in pull request #9610: ARROW-11735: [R] Allow Parquet and Arrow Dataset to be optional components
nealrichardson commented on a change in pull request #9610: URL: https://github.com/apache/arrow/pull/9610#discussion_r585139173 ## File path: r/tests/testthat/test-python-flight.R ## @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. +skip("Temporarily skip this") Review comment: Remove this (also why?) ## File path: r/data-raw/codegen.R ## @@ -194,10 +194,16 @@ arrow_exports_cpp <- glue::glue(' {feature_available("arrow")} +{feature_available("dataset")} Review comment: It might be good to make these bits iterate over `features` rather than copy-paste, just to help with future features. ## File path: r/tests/testthat/test-dataset.R ## @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. +skip_if_not_available("dataset") Review comment: You can skip a whole file like this? Cool. ## File path: r/tests/testthat/latin1.R ## @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -x <- iconv("Veitingasta�ir", to = "latin1") +x <- iconv("Veitingasta�ir", to = "latin1") Review comment: Was this change intentional? ## File path: r/R/dataset-partition.R ## @@ -76,7 +76,9 @@ HivePartitioning$create <- dataset___HivePartitioning #' calling `hive_partition()` with no arguments. #' @examples #' \donttest{ -#' hive_partition(year = int16(), month = int8()) +#' if (arrow_with_dataset()) { Review comment: Dev roxygen2 has an `@examplesIf` feature that might be better here. If we put the conditional in the example code, it may suggest to the user that they also need to check this every time, and generally they shouldn't. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] jonkeane commented on pull request #8650: ARROW-10570: [R] Use Converter API to convert SEXP to Array/ChunkedArray
jonkeane commented on pull request #8650: URL: https://github.com/apache/arrow/pull/8650#issuecomment-788404473 Ok, I've looked at this more, and my first reprex was a bit _too_ minimal it looks like. Here is another example/test we could/should add to exercise this (I'm happy to push a commit including it to this branch if you would like): ``` test_that("sf-like list columns", { df <- structure(list(col = structure(list(structure(list(list(structure(1))), class = "inner")), class = "outer")), class = "data.frame") expect_array_roundtrip(df) }) ``` The fix that you made does fix an error on the `inner` "class", but I believe that https://github.com/apache/arrow/blob/fe1c774813e9ce7123f6bb02c43bca1664e8370b/r/src/r_to_arrow.cpp#L1014 is tripping on the `outer` "class". This was extra-fun to debug (and explains why the sf example above worked) because it looks like sf [registers some vctrs](https://github.com/r-spatial/sf/blob/master/R/tidyverse-vctrs.R) methods which mean that these will work so long as those have been registered (i.e. whenever `sf` is used). I suspect that it would be very infrequent to have someone want to round-trip a parquet file including sf data without also having sf loaded (so in practice the current state would be fine), attempting to debug what's going on is super complicated (and any other non-standard vctrs that use that outer level style classing would similarly fail). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] alamb commented on pull request #9608: ARROW-11742: [Rust][DataFusion] Add Expr::is_null and Expr::is_not_nu…
alamb commented on pull request #9608: URL: https://github.com/apache/arrow/pull/9608#issuecomment-788388761 @NGA-TRAN there appears to be a clippy failuire: https://github.com/apache/arrow/pull/9608/checks?check_run_id=2007813369 ``` error: methods called `is_*` usually take self by reference or no self; consider choosing a less ambiguous name --> datafusion/src/logical_plan/expr.rs:446:20 | 446 | pub fn is_null(self) -> Expr { | | = note: `-D clippy::wrong-self-convention` implied by `-D warnings` = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#wrong_self_convention error: methods called `is_*` usually take self by reference or no self; consider choosing a less ambiguous name --> datafusion/src/logical_plan/expr.rs:451:24 | 451 | pub fn is_not_null(self) -> Expr { | | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#wrong_self_convention ``` I think given what we are doing (mirroring the expression names rather than actually implementing Idiomatic Rust conversions), annotating those functions with something like ``` #[allow(clippy::wrong-self-convention)] ``` is probably the best approach This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] github-actions[bot] commented on pull request #9610: ARROW-11735: [R] Allow Parquet and Arrow Dataset to be optional components
github-actions[bot] commented on pull request #9610: URL: https://github.com/apache/arrow/pull/9610#issuecomment-788387441 https://issues.apache.org/jira/browse/ARROW-11735 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] ianmcook opened a new pull request #9610: ARROW-11735: [R] Allow Parquet and Arrow Dataset to be optional components
ianmcook opened a new pull request #9610: URL: https://github.com/apache/arrow/pull/9610 Not implemented for Windows yet This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] nealrichardson commented on pull request #9591: ARROW-11729: [R] Add examples to the datasets documentation
nealrichardson commented on pull request #9591: URL: https://github.com/apache/arrow/pull/9591#issuecomment-788385951 I still see the extra files. I literally meant to suggest `git revert`, not undo the changes manually. So I would now do: ``` git revert cd0ecff git revert 389d793 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] github-actions[bot] commented on pull request #9609: ARROW-11830: [C++] Don't re-detect gRPC every time
github-actions[bot] commented on pull request #9609: URL: https://github.com/apache/arrow/pull/9609#issuecomment-788373898 https://issues.apache.org/jira/browse/ARROW-11830 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] nealrichardson commented on a change in pull request #9591: ARROW-11729: [R] Add examples to the datasets documentation
nealrichardson commented on a change in pull request #9591: URL: https://github.com/apache/arrow/pull/9591#discussion_r585104212 ## File path: r/R/dataset-write.R ## @@ -71,25 +72,30 @@ #' d <- dplyr::group_by(mtcars, cyl, gear) #' #' d %>% -#' write_dataset(two_part_dir_2, "parquet") +#' write_dataset(two_part_dir_2) #' #' # Just passing an additional hive_style option to see the difference #' # in the output #' #' d %>% -#' write_dataset(two_part_dir_2, "parquet", hive_style = FALSE) +#' write_dataset(two_part_dir_2, hive_style = FALSE) #' #' list.files(two_part_dir_2, pattern = "parquet", recursive = TRUE) #' list.files(two_part_dir_3, pattern = "parquet", recursive = TRUE) #' } #' @export write_dataset <- function(dataset, path, - format = dataset$format, + format = "parquet", Review comment: I don't think this is the right change. Please see the proposal in https://issues.apache.org/jira/browse/ARROW-11582?focusedCommentId=17292994=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17292994. I'd recommend reverting this change and doing the format argument change in a separate PR associated with that JIRA issue, rather than grow the scope of this one. ## File path: r/R/dataset-write.R ## @@ -71,25 +72,30 @@ #' d <- dplyr::group_by(mtcars, cyl, gear) #' #' d %>% -#' write_dataset(two_part_dir_2, "parquet") +#' write_dataset(two_part_dir_2) #' #' # Just passing an additional hive_style option to see the difference #' # in the output #' #' d %>% -#' write_dataset(two_part_dir_2, "parquet", hive_style = FALSE) +#' write_dataset(two_part_dir_2, hive_style = FALSE) #' #' list.files(two_part_dir_2, pattern = "parquet", recursive = TRUE) #' list.files(two_part_dir_3, pattern = "parquet", recursive = TRUE) #' } #' @export write_dataset <- function(dataset, path, - format = dataset$format, + format = "parquet", partitioning = dplyr::group_vars(dataset), basename_template = paste0("part-{i}.", as.character(format)), hive_style = TRUE, ...) { + if (inherits(dataset$format, "ParquetFileFormat")) { +format <- dataset$format +basename_template <- paste0("part-{i}.", as.character(format)) + } + Review comment: I don't think this helps anything. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] lidavidm opened a new pull request #9609: ARROW-11830: [C++] Don't re-detect gRPC every time
lidavidm opened a new pull request #9609: URL: https://github.com/apache/arrow/pull/9609 Cache the detected gRPC version since testing is slow. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] seddonm1 commented on pull request #9565: ARROW-11655: [Rust][DataFusion] Postgres String Functions: left, lpad, right, rpad
seddonm1 commented on pull request #9565: URL: https://github.com/apache/arrow/pull/9565#issuecomment-788291128 @alamb This is the PR with the type-coercion changes removed and the explicit `CAST(NULL AS INT)` re-added to ensure tests will pass. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] jorgecarleitao commented on a change in pull request #9602: ARROW-11630: [Rust] Introduce limit option for sort kernel
jorgecarleitao commented on a change in pull request #9602: URL: https://github.com/apache/arrow/pull/9602#discussion_r584928250 ## File path: rust/arrow/Cargo.toml ## @@ -51,6 +51,8 @@ flatbuffers = "^0.8" hex = "0.4" prettytable-rs = { version = "0.8.0", optional = true } lexical-core = "^0.7" +partial_sort = "0.1.1" +pdqsort = "1.0.3" Review comment: I am -0 on this without any benchmarks justifying this. Can't we at least feature-gate this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] seddonm1 commented on a change in pull request #9565: ARROW-11655: [Rust][DataFusion] Postgres String Functions: left, lpad, right, rpad
seddonm1 commented on a change in pull request #9565: URL: https://github.com/apache/arrow/pull/9565#discussion_r585042342 ## File path: rust/datafusion/src/physical_plan/type_coercion.rs ## @@ -168,20 +168,35 @@ fn maybe_data_types( pub fn can_coerce_from(type_into: , type_from: ) -> bool { use self::DataType::*; match type_into { -Int8 => matches!(type_from, Int8), -Int16 => matches!(type_from, Int8 | Int16 | UInt8), -Int32 => matches!(type_from, Int8 | Int16 | Int32 | UInt8 | UInt16), +Int8 => matches!(type_from, Int8 | Utf8 | LargeUtf8), Review comment: @jorgecarleitao @alamb I would like to get the rest of these SQL functions implemented then pick up the type-coercion work. I was also thinking a `try_cast` implementation but it will be a lot of work. I think we also need to produce a matrix like this: https://docs.microsoft.com/en-us/sql/t-sql/functions/cast-and-convert-transact-sql?view=sql-server-ver15#implicit-conversions to work out the scope of the changes as I think we have a very imbalanced set of implementations at the moment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] seddonm1 commented on a change in pull request #9565: ARROW-11655: [Rust][DataFusion] Postgres String Functions: left, lpad, right, rpad
seddonm1 commented on a change in pull request #9565: URL: https://github.com/apache/arrow/pull/9565#discussion_r585040519 ## File path: rust/datafusion/src/physical_plan/string_expressions.rs ## @@ -361,10 +534,167 @@ pub fn ltrim(args: &[ArrayRef]) -> Result { } } -/// Converts the string to all lower case. -/// lower('TOM') = 'tom' -pub fn lower(args: &[ColumnarValue]) -> Result { -handle(args, |x| x.to_ascii_lowercase(), "lower") +/// Returns last n characters in the string, or when n is negative, returns all but first |n| characters. +/// right('abcde', 2) = 'de' +pub fn right(args: &[ArrayRef]) -> Result { +let string_array: = args[0] +.as_any() +.downcast_ref::>() +.ok_or_else(|| { +DataFusionError::Internal("could not cast string to StringArray".to_string()) +})?; + +let n_array: = +args[1] +.as_any() +.downcast_ref::() +.ok_or_else(|| { +DataFusionError::Internal("could not cast n to Int64Array".to_string()) +})?; + +let result = string_array +.iter() +.enumerate() +.map(|(i, x)| { +if n_array.is_null(i) { +None +} else { +x.map(|x: | { +let n: i64 = n_array.value(i); Review comment: Hi @jorgecarleitao thanks for this. My understanding is that one of the core properties of a `RecordBatch` is that all columns must have the same length: https://github.com/apache/arrow/blob/master/rust/arrow/src/record_batch.rs#L52 implemented here: https://github.com/apache/arrow/blob/master/rust/arrow/src/record_batch.rs#L134 From what I can see, if we did adopt a `zip` then we would implicitly be treating the shorter argument as a `None` which wont break the out of bounds check but might produce some very strange function results. I do agree with you that many of the core Rust Arrow implementations are throwing away the benefits of the Rust compiler so we should try to sensibly refactor for safety. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] seddonm1 commented on pull request #9567: ARROW-11775: [Rust][DataFusion] Feature Flags for Dependencies
seddonm1 commented on pull request #9567: URL: https://github.com/apache/arrow/pull/9567#issuecomment-788256916 @Dandandan this was discussed at the last Arrow Rust sync call. The intention was to allow reducing compile time of projects which depend on DataFusion - not the DataFusion project itself. I think that care should be taken to keep the dependencies sensible but ultimately if we are chasing some sort of SQL compatibility (i.e. Postgres) then having the standard set of SQL functions on by default seems the correct approach. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] seddonm1 commented on a change in pull request #9567: ARROW-11775: [Rust][DataFusion] Feature Flags for Dependencies
seddonm1 commented on a change in pull request #9567: URL: https://github.com/apache/arrow/pull/9567#discussion_r585028643 ## File path: rust/datafusion/src/physical_plan/functions.rs ## @@ -447,6 +446,26 @@ pub fn return_type( } } +#[cfg(feature = "crypto_expressions")] +macro_rules! invoke_if_crypto_expressions_feature_flag { +($FUNC:ident, $NAME:expr) => {{ +use crate::physical_plan::crypto_expressions; +crypto_expressions::$FUNC +}}; +} + +#[cfg(not(feature = "crypto_expressions"))] Review comment: @elferherrera I tried this but unfortunately the compiler does not like it. The `#[cfg]` tag does not operate on `{..}` blocks. The end-state of this was my least-bad solution. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] jorgecarleitao commented on a change in pull request #9602: ARROW-11630: [Rust] Introduce limit option for sort kernel
jorgecarleitao commented on a change in pull request #9602: URL: https://github.com/apache/arrow/pull/9602#discussion_r584928250 ## File path: rust/arrow/Cargo.toml ## @@ -51,6 +51,8 @@ flatbuffers = "^0.8" hex = "0.4" prettytable-rs = { version = "0.8.0", optional = true } lexical-core = "^0.7" +partial_sort = "0.1.1" +pdqsort = "1.0.3" Review comment: I am -1 on this. Either add a feature gate or something else. ## File path: rust/arrow/src/compute/kernels/sort.rs ## @@ -36,8 +36,12 @@ use TimeUnit::*; /// /// Returns an `ArrowError::ComputeError(String)` if the array type is either unsupported by `sort_to_indices` or `take`. /// -pub fn sort(values: , options: Option) -> Result { -let indices = sort_to_indices(values, options)?; +pub fn sort( +values: , +options: Option, +limit: Option, Review comment: I think that this should be done on a separate function instead of adding another parameter to this one. It would also allow to feature-gate said function with the two new dependencies. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] pitrou commented on a change in pull request #9582: PARQUET-1655: [C++] Fix comparison of Decimal values in statistics
pitrou commented on a change in pull request #9582: URL: https://github.com/apache/arrow/pull/9582#discussion_r584992780 ## File path: cpp/src/parquet/statistics.cc ## @@ -133,56 +136,95 @@ struct CompareHelper { static T Max(int type_length, const T& a, const T& b) { return Compare(0, a, b) ? b : a; } -}; // namespace parquet - -template -struct ByteLikeCompareHelperBase { - using T = ByteArrayType::c_type; - using PtrType = typename std::conditional::type; +}; - static T DefaultMin() { return {}; } - static T DefaultMax() { return {}; } - static T Coalesce(T val, T fallback) { return val; } +template +struct BinaryLikeComparer {}; - static inline bool Compare(int type_length, const T& a, const T& b) { -const auto* aptr = reinterpret_cast(a.ptr); -const auto* bptr = reinterpret_cast(b.ptr); -return std::lexicographical_compare(aptr, aptr + a.len, bptr, bptr + b.len); +template +struct BinaryLikeComparer { + static bool Compare(int type_length, const T& a, const T& b) { +int a_length = value_length(type_length, a); +int b_length = value_length(type_length, b); +// Unsigned comparison is used for non-numeric types so straight +// lexiographic comparison makes sense. (a.ptr is always unsigned) +return std::lexicographical_compare(a.ptr, a.ptr + a_length, b.ptr, b.ptr + b_length); } +}; - static T Min(int type_length, const T& a, const T& b) { -if (a.ptr == nullptr) return b; -if (b.ptr == nullptr) return a; -return Compare(type_length, a, b) ? a : b; - } +template +struct BinaryLikeComparer { + static bool Compare(int type_length, const T& a, const T& b) { +// Is signed is used for integers encoded as big-endian twos +// complement integers. (e.g. decimals). +int a_length = value_length(type_length, a); +int b_length = value_length(type_length, b); + +// At least of the lengths is zero. +if (a_length == 0 || b_length == 0) { + return a_length == 0 && b_length > 0; +} - static T Max(int type_length, const T& a, const T& b) { -if (a.ptr == nullptr) return b; -if (b.ptr == nullptr) return a; -return Compare(type_length, a, b) ? b : a; +int8_t first_a = *a.ptr; +int8_t first_b = *b.ptr; +// We can short circuit for different signed numbers or +// for equal length bytes arrays that have different first bytes. +if ((0x80 & first_a) != (0x80 & first_b) || +(a_length == b_length && first_a != first_b)) { + return first_a < first_b; +} Review comment: Hmm, I see. I'll take a closer look :-) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] alamb commented on pull request #9608: ARROW-11742: [Rust][DataFusion] Add Expr::is_null and Expr::is_not_nu…
alamb commented on pull request #9608: URL: https://github.com/apache/arrow/pull/9608#issuecomment-788203755 @Dandandan / @andygrove any thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] elferherrera commented on pull request #9598: ARROW-11804: [Developer] Offer to create JIRA issue
elferherrera commented on pull request #9598: URL: https://github.com/apache/arrow/pull/9598#issuecomment-788199778 @jorgecarleitao after reading the discussion about slack channel I understand why they closed it. It makes sense since they want to keep all conversations stored and they also want traceability. However, we should specify that these real time chats should be used only for quick questions and ad hoc conversations. If someone wants to get consensus on a topic then it would be directed to the mailing list. That should be one of the many rules people using the chat must follow. Also, there should be individual channels for each language. The small mini cells that are being created for each language should be able to talk to each other in a rather simple and quick way. Don't you think so? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] NGA-TRAN commented on pull request #9608: ARROW-11742: [Rust][DataFusion] Add Expr::is_null and Expr::is_not_nu…
NGA-TRAN commented on pull request #9608: URL: https://github.com/apache/arrow/pull/9608#issuecomment-788197836 > This code looks great @NGA-TRAN -- thank you for the contribution! > > Would it be possible to add some really basic test? Mostly to document the use of these functions and make sure they don't get lost / removed by accident Tests added, @alamb This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] Dandandan commented on pull request #9567: ARROW-11775: [Rust][DataFusion] Feature Flags for Dependencies
Dandandan commented on pull request #9567: URL: https://github.com/apache/arrow/pull/9567#issuecomment-788196901 The features for cryptography functions looks good to me! Thanks @seddonm1 ! I think it is a good idea to make some things optional, to be able to exclude certain features. I think the features introduce some more complexity to the build, as we introduce more variants that maybe could fail to compile or to fail passing test in some configuration. So, overall I think we should strive to keep the usage relatively straightforward and avoid too complex usage. If we can reduce the number of dependencies or move something out, I think that might be preferable if at all possible. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] Dandandan commented on a change in pull request #9567: ARROW-11775: [Rust][DataFusion] Feature Flags for Dependencies
Dandandan commented on a change in pull request #9567: URL: https://github.com/apache/arrow/pull/9567#discussion_r584973666 ## File path: rust/datafusion/Cargo.toml ## @@ -40,9 +40,10 @@ name = "datafusion-cli" path = "src/bin/main.rs" [features] -default = ["cli"] +default = ["cli", "crypto_expressions"] Review comment: By having them on by default, how do we reduce compile times? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] Dandandan edited a comment on pull request #9596: ARROW-11495: [Rust] Better numerical_coercion
Dandandan edited a comment on pull request #9596: URL: https://github.com/apache/arrow/pull/9596#issuecomment-788175032 . > I think this implementation looks really nice -- thank you @sundy-li . I believe that the code does what the PR description says it does. > > I wonder if you happen to know what Postgres does in this situation (eg adding 2 32-bit numbers together?) I can imagine certain situations when the users wants to choose performance over avoiding possible overflow and would prefer not to upcast both arguments to 64-bit. > > @Dandandan / @andygrove what do you think about what the default behavior should be? > I think this implementation looks really nice -- thank you @sundy-li . I believe that the code does what the PR description says it does. > > I wonder if you happen to know what Postgres does in this situation (eg adding 2 32-bit numbers together?) I can imagine certain situations when the users wants to choose performance over avoiding possible overflow and would prefer not to upcast both arguments to 64-bit. > > @Dandandan / @andygrove what do you think about what the default behavior should be? I think it could be acceptable to do a bit more here than PostgreSQL does? It think it will make DataFusion behave differently however from PostgreSQL. E.g. PostgreSQL returns an error `integer out of range` on `SELECT 200 + 200` which I think the coercion will avoid by upcasting to 64 bit integers. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] Dandandan commented on pull request #9596: ARROW-11495: [Rust] Better numerical_coercion
Dandandan commented on pull request #9596: URL: https://github.com/apache/arrow/pull/9596#issuecomment-788185255 Also for memory/performance concerns - I think it makes sense to think about those as well. We will do some extra copies by casting, so `u8 + u8` will be converted if I understand correctly to `c(u8, u16) + c(u8, u16)`, but `u8 + u8 + u8` will involve more copies to something like `c(c(u8, u16)+ c(u8, u16), u32) + c(u8, u32)` etc. as the subexpressions are coerced to u16 already first. This could be optimized out a bit maybe to avoid the double casting, but it shows it can be quite inefficient. Also when doing IO and keeping the result in memory will result in more storage costs, longer latency and/or higher memory usage if you don't convert them to use smaller types again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] Dandandan commented on pull request #9596: ARROW-11495: [Rust] Better numerical_coercion
Dandandan commented on pull request #9596: URL: https://github.com/apache/arrow/pull/9596#issuecomment-788175032 . > I think this implementation looks really nice -- thank you @sundy-li . I believe that the code does what the PR description says it does. > > I wonder if you happen to know what Postgres does in this situation (eg adding 2 32-bit numbers together?) I can imagine certain situations when the users wants to choose performance over avoiding possible overflow and would prefer not to upcast both arguments to 64-bit. > > @Dandandan / @andygrove what do you think about what the default behavior should be? > I think this implementation looks really nice -- thank you @sundy-li . I believe that the code does what the PR description says it does. > > I wonder if you happen to know what Postgres does in this situation (eg adding 2 32-bit numbers together?) I can imagine certain situations when the users wants to choose performance over avoiding possible overflow and would prefer not to upcast both arguments to 64-bit. > > @Dandandan / @andygrove what do you think about what the default behavior should be? I think it could be acceptable to do a bit more here than PostgreSQL does? It think it will make DataFusion behave differently however from PostgreSQL. E.g. PostgreSQL returns an error `integer out of range` on `SELECT 200 + 200` which I think the coercion will avoid by upcasting to 64 bit integers. > I think this implementation looks really nice -- thank you @sundy-li . I believe that the code does what the PR description says it does. > > I wonder if you happen to know what Postgres does in this situation (eg adding 2 32-bit numbers together?) I can imagine certain situations when the users wants to choose performance over avoiding possible overflow and would prefer not to upcast both arguments to 64-bit. > > @Dandandan / @andygrove what do you think about what the default behavior should be? I think it could be acceptable to do a bit more here than PostgreSQL does? It think it will make DataFusion behave differently however from PostgreSQL. E.g. PostgreSQL returns an error `integer out of range` on `SELECT 200 + 200` but I think the coercion will avoid this error by upcasting to 64 bit integers. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] alamb edited a comment on pull request #9596: ARROW-11495: [Rust] Better numerical_coercion
alamb edited a comment on pull request #9596: URL: https://github.com/apache/arrow/pull/9596#issuecomment-788171536 I just tried a quick test on postgres which I think says postgres doesn't upcast integer types ```sql alamb=# create table foo (x smallint); CREATE TABLE alamb=# create table bar as select x * x from foo; SELECT 0 alamb=# \d bar Table "public.bar" Column | Type | Collation | Nullable | Default --+--+---+--+- ?column? | smallint | | | alamb=# ``` If a calculation overflows the maximum value of a `smallint`, postgres produces a runtime error rather than automatically upcasing: ```sql alamb=# insert into foo values (32767); INSERT 0 1 alamb=# select x + x from foo; ERROR: smallint out of range alamb=# ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] github-actions[bot] commented on pull request #9608: ARROW-11742: [Rust][DataFusion] Add Expr::is_null and Expr::is_not_nu…
github-actions[bot] commented on pull request #9608: URL: https://github.com/apache/arrow/pull/9608#issuecomment-788171798 https://issues.apache.org/jira/browse/ARROW-11742 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] alamb commented on pull request #9596: ARROW-11495: [Rust] Better numerical_coercion
alamb commented on pull request #9596: URL: https://github.com/apache/arrow/pull/9596#issuecomment-788171536 I just tried a quick test on postgres which I think says postgres doesn't upcast integer types ``` alamb=# create table foo (x smallint); CREATE TABLE alamb=# create table bar as select x * x from foo; SELECT 0 alamb=# \d bar Table "public.bar" Column | Type | Collation | Nullable | Default --+--+---+--+- ?column? | smallint | | | alamb=# ``` If a calculation overflows the maximum value of a `smallint`, postgres produces a runtime error rather than automatically upcasing: ``` alamb=# insert into foo values (32767); INSERT 0 1 alamb=# select x + x from foo; ERROR: smallint out of range alamb=# ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] elferherrera commented on a change in pull request #9567: ARROW-11775: [Rust][DataFusion] Feature Flags for Dependencies
elferherrera commented on a change in pull request #9567: URL: https://github.com/apache/arrow/pull/9567#discussion_r584929370 ## File path: rust/datafusion/src/physical_plan/functions.rs ## @@ -447,6 +446,26 @@ pub fn return_type( } } +#[cfg(feature = "crypto_expressions")] +macro_rules! invoke_if_crypto_expressions_feature_flag { +($FUNC:ident, $NAME:expr) => {{ +use crate::physical_plan::crypto_expressions; +crypto_expressions::$FUNC +}}; +} + +#[cfg(not(feature = "crypto_expressions"))] Review comment: I think you can put the `#[cfg(feature)]` inside the macro so you dont have to maintain two funtions What do you think? ``` macro_rules! invoke_if_crypto_expressions_feature_flag { ($FUNC:ident, $NAME:expr) => { #[cfg(not(feature = "crypto_expressions"))] { |_: &[ColumnarValue]| -> Result { Err(DataFusionError::Internal(format!( "function {} requires compilation with feature flag: crypto_expressions.", $NAME ))) } } #[cfg(feature = "crypto_expressions")] { use crate::physical_plan::crypto_expressions; crypto_expressions::$FUNC } }; } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] jorgecarleitao commented on a change in pull request #9565: ARROW-11655: [Rust][DataFusion] Postgres String Functions: left, lpad, right, rpad
jorgecarleitao commented on a change in pull request #9565: URL: https://github.com/apache/arrow/pull/9565#discussion_r584926908 ## File path: rust/datafusion/src/physical_plan/type_coercion.rs ## @@ -168,20 +168,35 @@ fn maybe_data_types( pub fn can_coerce_from(type_into: , type_from: ) -> bool { use self::DataType::*; match type_into { -Int8 => matches!(type_from, Int8), -Int16 => matches!(type_from, Int8 | Int16 | UInt8), -Int32 => matches!(type_from, Int8 | Int16 | Int32 | UInt8 | UInt16), +Int8 => matches!(type_from, Int8 | Utf8 | LargeUtf8), Review comment: Another option is to add a new function `maybe_cast -> Result>`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] jorgecarleitao commented on a change in pull request #9565: ARROW-11655: [Rust][DataFusion] Postgres String Functions: left, lpad, right, rpad
jorgecarleitao commented on a change in pull request #9565: URL: https://github.com/apache/arrow/pull/9565#discussion_r584924393 ## File path: rust/datafusion/src/physical_plan/string_expressions.rs ## @@ -361,10 +534,167 @@ pub fn ltrim(args: &[ArrayRef]) -> Result { } } -/// Converts the string to all lower case. -/// lower('TOM') = 'tom' -pub fn lower(args: &[ColumnarValue]) -> Result { -handle(args, |x| x.to_ascii_lowercase(), "lower") +/// Returns last n characters in the string, or when n is negative, returns all but first |n| characters. +/// right('abcde', 2) = 'de' +pub fn right(args: &[ArrayRef]) -> Result { +let string_array: = args[0] +.as_any() +.downcast_ref::>() +.ok_or_else(|| { +DataFusionError::Internal("could not cast string to StringArray".to_string()) +})?; + +let n_array: = +args[1] +.as_any() +.downcast_ref::() +.ok_or_else(|| { +DataFusionError::Internal("could not cast n to Int64Array".to_string()) +})?; + +let result = string_array +.iter() +.enumerate() +.map(|(i, x)| { +if n_array.is_null(i) { +None +} else { +x.map(|x: | { +let n: i64 = n_array.value(i); Review comment: First of all, thanks a lot for this, @seddonm1 . Impressive. I do not have time to go through all of this, but I will try to comment over what I find. Because we do not check that the length of the two arrays are equal, I think that this will read out of bounds whenever `n_array.len() < string_array.len()`. I would recommend tthat we do not use `.value` or `is_null` and instead only use the corresponding iterators and `zip`. The reason being that even if the iterators have a different length, the resulting code is still sound as the zipped iterator stops at the shortest iterator. More generally, I think we should set a specific guideline and not allow new code to use these `unsafe`-yet-safe APIs, or better, just mark them as deprecated so that people use the corresponding safe ones. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] alamb commented on a change in pull request #9596: ARROW-11495: [Rust] Better numerical_coercion
alamb commented on a change in pull request #9596: URL: https://github.com/apache/arrow/pull/9596#discussion_r584917648 ## File path: rust/datafusion/src/physical_plan/expressions/binary.rs ## @@ -700,9 +702,9 @@ mod tests { DataType::UInt32, vec![1u32, 2u32], Operator::Plus, -Int32Array, Review comment: I reviewed the test changes in this file and they make sense to me. ## File path: rust/datafusion/src/physical_plan/expressions/coercion.rs ## @@ -44,6 +46,68 @@ pub fn is_numeric(dt: ) -> bool { } } +/// Get next byte size of the integer number +fn next_size(size: usize) -> usize { +if size < 8_usize { +return size * 2; +} +size +} + +/// Determine if a DataType is float or not +pub fn is_floating(dt: ) -> bool { +matches!( +dt, +DataType::Float16 | DataType::Float32 | DataType::Float64 +) +} + +pub fn is_integer(dt: ) -> bool { +is_numeric(dt) && !is_floating(dt) +} + +pub fn numeric_byte_size(dt: ) -> Option { +match dt { +DataType::Int8 | DataType::UInt8 => Some(1), +DataType::Int16 | DataType::UInt16 | DataType::Float16 => Some(2), +DataType::Int32 | DataType::UInt32 | DataType::Float32 => Some(4), +DataType::Int64 | DataType::UInt64 | DataType::Float64 => Some(8), +_ => None, +} +} + +pub fn construct_numeric_type( +is_signed: bool, +is_floating: bool, +byte_size: usize, +) -> Option { +match (is_signed, is_floating, byte_size) { +(false, false, 1) => Some(DataType::UInt8), +(false, false, 2) => Some(DataType::UInt16), +(false, false, 4) => Some(DataType::UInt32), +(false, false, 8) => Some(DataType::UInt64), +(false, true, 1) => Some(DataType::Float16), +(false, true, 2) => Some(DataType::Float16), +(false, true, 4) => Some(DataType::Float32), +(false, true, 8) => Some(DataType::Float64), +(true, false, 1) => Some(DataType::Int8), +(true, false, 2) => Some(DataType::Int16), +(true, false, 4) => Some(DataType::Int32), +(true, false, 8) => Some(DataType::Int64), +(true, true, 1) => Some(DataType::Float32), +(true, true, 2) => Some(DataType::Float32), +(true, true, 4) => Some(DataType::Float32), +(true, true, 8) => Some(DataType::Float64), + +// TODO support bigint and decimal types, now we just let's overflow Review comment: I think this is reasonable ## File path: rust/datafusion/src/physical_plan/expressions/coercion.rs ## @@ -44,6 +46,68 @@ pub fn is_numeric(dt: ) -> bool { } } +/// Get next byte size of the integer number +fn next_size(size: usize) -> usize { +if size < 8_usize { +return size * 2; +} +size +} + +/// Determine if a DataType is float or not +pub fn is_floating(dt: ) -> bool { Review comment: I think it might be worth putting these functions on `DataType` itself -- `DataType::is_floating()`, and `DataType::numeric_byte_size()` but we can always do that in a subsequent PR -- it is not needed for this one This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] github-actions[bot] commented on pull request #9607: ARROW-7001: [C++] Develop threading APIs to accommodate nested parallelism
github-actions[bot] commented on pull request #9607: URL: https://github.com/apache/arrow/pull/9607#issuecomment-788130722 https://issues.apache.org/jira/browse/ARROW-7001 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] alamb commented on a change in pull request #9565: ARROW-11655: [Rust][DataFusion] Postgres String Functions: left, lpad, right, rpad
alamb commented on a change in pull request #9565: URL: https://github.com/apache/arrow/pull/9565#discussion_r584911188 ## File path: rust/datafusion/src/physical_plan/type_coercion.rs ## @@ -168,20 +168,35 @@ fn maybe_data_types( pub fn can_coerce_from(type_into: , type_from: ) -> bool { use self::DataType::*; match type_into { -Int8 => matches!(type_from, Int8), -Int16 => matches!(type_from, Int8 | Int16 | UInt8), -Int32 => matches!(type_from, Int8 | Int16 | Int32 | UInt8 | UInt16), +Int8 => matches!(type_from, Int8 | Utf8 | LargeUtf8), Review comment: > How about I revert the coercion logic from this PR and re-add the explicit cast in the tests. Sounds like a great plan > I think there is a major piece of work to fully address CAST/coercion. I agree ## File path: rust/datafusion/src/physical_plan/type_coercion.rs ## @@ -168,20 +168,35 @@ fn maybe_data_types( pub fn can_coerce_from(type_into: , type_from: ) -> bool { use self::DataType::*; match type_into { -Int8 => matches!(type_from, Int8), -Int16 => matches!(type_from, Int8 | Int16 | UInt8), -Int32 => matches!(type_from, Int8 | Int16 | Int32 | UInt8 | UInt16), +Int8 => matches!(type_from, Int8 | Utf8 | LargeUtf8), Review comment: > How about I revert the coercion logic from this PR and re-add the explicit cast in the tests. Sounds like a great plan > I think there is a major piece of work to fully address CAST/coercion. I agree This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] alamb commented on pull request #9567: ARROW-11775: [Rust][DataFusion] Feature Flags for Dependencies
alamb commented on pull request #9567: URL: https://github.com/apache/arrow/pull/9567#issuecomment-788129010 @andygrove @Dandandan @elferherrera what do you think of this approach? I really like it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] alamb commented on a change in pull request #9567: ARROW-11775: [Rust][DataFusion] Feature Flags for Dependencies
alamb commented on a change in pull request #9567: URL: https://github.com/apache/arrow/pull/9567#discussion_r584908792 ## File path: rust/datafusion/src/physical_plan/functions.rs ## @@ -447,6 +446,26 @@ pub fn return_type( } } +#[cfg(feature = "crypto_expressions")] +macro_rules! invoke_if_crypto_expressions_feature_flag { Review comment: Nice! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] alamb commented on a change in pull request #9605: ARROW-11802: [Rust][DataFusion] Remove use of crossbeam channels to avoid potential deadlocks
alamb commented on a change in pull request #9605: URL: https://github.com/apache/arrow/pull/9605#discussion_r584901754 ## File path: rust/datafusion/src/physical_plan/repartition.rs ## @@ -48,9 +51,13 @@ pub struct RepartitionExec { input: Arc, /// Partitioning scheme to use partitioning: Partitioning, -/// Channels for sending batches from input partitions to output partitions -/// there is one entry in this Vec for each output partition -channels: Arc, Receiver)>>>, +/// Channels for sending batches from input partitions to output partitions. +/// Key is the partition number +channels: Arc< Review comment: Thanks @Dandandan The use is here: https://github.com/apache/arrow/pull/9605/files#diff-b9b79e3b35bc8bfb43040ada3a4382bd0a0017ca1b1e8135be8fb310ff095674R229 Basically this code sets up all input and outputs channels for all of the partitions and then hands out one receiver at a time in some arbitrary order (depending on the `partition` argument). `UnboundedReceiver` https://docs.rs/tokio/1.2.0/tokio/sync/mpsc/struct.UnboundedReceiver.html doesn't implement `Clone` (as it is multiple producer *single* consumer) I suspect with some more thought a different structure could be used, but I couldn't convince myself it was a valuable use of time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] Dandandan commented on a change in pull request #9605: ARROW-11802: [Rust][DataFusion] Remove use of crossbeam channels to avoid potential deadlocks
Dandandan commented on a change in pull request #9605: URL: https://github.com/apache/arrow/pull/9605#discussion_r584891340 ## File path: rust/datafusion/src/physical_plan/repartition.rs ## @@ -48,9 +51,13 @@ pub struct RepartitionExec { input: Arc, /// Partitioning scheme to use partitioning: Partitioning, -/// Channels for sending batches from input partitions to output partitions -/// there is one entry in this Vec for each output partition -channels: Arc, Receiver)>>>, +/// Channels for sending batches from input partitions to output partitions. +/// Key is the partition number +channels: Arc< Review comment: So ok if it's needed, but I just wasn't sure :+1: This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] Dandandan commented on a change in pull request #9605: ARROW-11802: [Rust][DataFusion] Remove use of crossbeam channels to avoid potential deadlocks
Dandandan commented on a change in pull request #9605: URL: https://github.com/apache/arrow/pull/9605#discussion_r58450 ## File path: rust/datafusion/src/physical_plan/repartition.rs ## @@ -48,9 +51,13 @@ pub struct RepartitionExec { input: Arc, /// Partitioning scheme to use partitioning: Partitioning, -/// Channels for sending batches from input partitions to output partitions -/// there is one entry in this Vec for each output partition -channels: Arc, Receiver)>>>, +/// Channels for sending batches from input partitions to output partitions. +/// Key is the partition number +channels: Arc< Review comment: I didn't follow the answer completely; why do we need to remove it? I think `UnboundedSender` is cheap to clone and cheap to keep in memory for the duration of `RepartitionStream`? But maybe I would need to play with the code more to see why it is needed per se. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] NGA-TRAN opened a new pull request #9608: ARROW-11742: [Rust][DataFusion] Add Expr::is_null and Expr::is_not_nu…
NGA-TRAN opened a new pull request #9608: URL: https://github.com/apache/arrow/pull/9608 Implement is_null and is_not_null for DataFusion logical_plan's Expr specified in[ ARROW-11742](https://issues.apache.org/jira/browse/ARROW-11742) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] emkornfield commented on a change in pull request #9582: PARQUET-1655: [C++] Fix comparison of Decimal values in statistics
emkornfield commented on a change in pull request #9582: URL: https://github.com/apache/arrow/pull/9582#discussion_r584864172 ## File path: cpp/src/parquet/statistics_test.cc ## @@ -111,21 +117,28 @@ TEST(Comparison, UnsignedByteArray) { } TEST(Comparison, SignedFLBA) { - int size = 10; + int size = 4; auto comparator = MakeComparator(Type::FIXED_LEN_BYTE_ARRAY, SortOrder::SIGNED, size); - std::string s1 = "Anti123456"; - std::string s2 = "Bunkd123456"; - FLBA s1flba = FLBAFromString(s1); - FLBA s2flba = FLBAFromString(s2); - ASSERT_TRUE(comparator->Compare(s1flba, s2flba)); + std::vector byte_values[] = { + {0x80, 0, 0, 0}, {0xFF, 0xFF, 0x01, 0},{0xFF, 0xFF, 0x80, 0}, + {0xFF, 0xFF, 0xFF, 0x80}, {0xFF, 0xFF, 0xFF, 0xFF}, {0, 0, 0x01, 0x01}, + {0, 0x01, 0x01, 0}, {0x01, 0x01, 0, 0}}; + std::vector values_to_compare; + for (auto& bytes : byte_values) { +values_to_compare.emplace_back(FLBA(bytes.data())); + } - s1 = "Bünk123456"; - s2 = "Bunk123456"; - s1flba = FLBAFromString(s1); - s2flba = FLBAFromString(s2); - ASSERT_TRUE(comparator->Compare(s1flba, s2flba)); + for (size_t x = 0; x < values_to_compare.size(); x++) { +EXPECT_FALSE(comparator->Compare(values_to_compare[x], values_to_compare[x])) << x; +for (size_t y = x + 1; y < values_to_compare.size(); y++) { + EXPECT_TRUE(comparator->Compare(values_to_compare[x], values_to_compare[y])) + << x << " " << y; + EXPECT_FALSE(comparator->Compare(values_to_compare[y], values_to_compare[x])) + << y << " " << x; +} + } Review comment: I think for FLBA these tests should be fairly complete. the bytes can be interepreted as big-endian two's complement number. I think I need to add an additional case that covers sign extension equality from the example I gave above. Is your concern upon representation here? Or would you like me to explicitly use util::Decimal* classes for the test cases? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] westonpace opened a new pull request #9607: ARROW-7001: [C++] Develop threading APIs to accommodate nested parallelism
westonpace opened a new pull request #9607: URL: https://github.com/apache/arrow/pull/9607 Still very much a WIP, expect more details to come. This PR ports the dataset/scanner logic to async. It does not actually make any readers (e.g. parquet reader, ipc reader) async. This will technically solve ARROW-7001 but is also a step towards making the readers async. There should be a performance gain from this with datasets with fewer files than # of cores (since we can make the inner reads nested) There should be a performance gain from this above and beyond that for CSV datasets (since we have a proper async CSV). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] emkornfield commented on a change in pull request #9582: PARQUET-1655: [C++] Fix comparison of Decimal values in statistics
emkornfield commented on a change in pull request #9582: URL: https://github.com/apache/arrow/pull/9582#discussion_r584862571 ## File path: cpp/src/parquet/statistics.cc ## @@ -133,56 +136,95 @@ struct CompareHelper { static T Max(int type_length, const T& a, const T& b) { return Compare(0, a, b) ? b : a; } -}; // namespace parquet - -template -struct ByteLikeCompareHelperBase { - using T = ByteArrayType::c_type; - using PtrType = typename std::conditional::type; +}; - static T DefaultMin() { return {}; } - static T DefaultMax() { return {}; } - static T Coalesce(T val, T fallback) { return val; } +template +struct BinaryLikeComparer {}; - static inline bool Compare(int type_length, const T& a, const T& b) { -const auto* aptr = reinterpret_cast(a.ptr); -const auto* bptr = reinterpret_cast(b.ptr); -return std::lexicographical_compare(aptr, aptr + a.len, bptr, bptr + b.len); +template +struct BinaryLikeComparer { + static bool Compare(int type_length, const T& a, const T& b) { +int a_length = value_length(type_length, a); +int b_length = value_length(type_length, b); +// Unsigned comparison is used for non-numeric types so straight +// lexiographic comparison makes sense. (a.ptr is always unsigned) +return std::lexicographical_compare(a.ptr, a.ptr + a_length, b.ptr, b.ptr + b_length); } +}; - static T Min(int type_length, const T& a, const T& b) { -if (a.ptr == nullptr) return b; -if (b.ptr == nullptr) return a; -return Compare(type_length, a, b) ? a : b; - } +template +struct BinaryLikeComparer { + static bool Compare(int type_length, const T& a, const T& b) { +// Is signed is used for integers encoded as big-endian twos +// complement integers. (e.g. decimals). +int a_length = value_length(type_length, a); +int b_length = value_length(type_length, b); + +// At least of the lengths is zero. +if (a_length == 0 || b_length == 0) { + return a_length == 0 && b_length > 0; +} - static T Max(int type_length, const T& a, const T& b) { -if (a.ptr == nullptr) return b; -if (b.ptr == nullptr) return a; -return Compare(type_length, a, b) ? b : a; +int8_t first_a = *a.ptr; +int8_t first_b = *b.ptr; +// We can short circuit for different signed numbers or +// for equal length bytes arrays that have different first bytes. +if ((0x80 & first_a) != (0x80 & first_b) || +(a_length == b_length && first_a != first_b)) { + return first_a < first_b; +} +// When the lengths are unequal and the numbers are of the same +// sign we need to extend the digits. Review comment: See comment above. I think I addressed it their. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] emkornfield commented on a change in pull request #9582: PARQUET-1655: [C++] Fix comparison of Decimal values in statistics
emkornfield commented on a change in pull request #9582: URL: https://github.com/apache/arrow/pull/9582#discussion_r584862144 ## File path: cpp/src/parquet/statistics.cc ## @@ -133,56 +136,95 @@ struct CompareHelper { static T Max(int type_length, const T& a, const T& b) { return Compare(0, a, b) ? b : a; } -}; // namespace parquet - -template -struct ByteLikeCompareHelperBase { - using T = ByteArrayType::c_type; - using PtrType = typename std::conditional::type; +}; - static T DefaultMin() { return {}; } - static T DefaultMax() { return {}; } - static T Coalesce(T val, T fallback) { return val; } +template +struct BinaryLikeComparer {}; - static inline bool Compare(int type_length, const T& a, const T& b) { -const auto* aptr = reinterpret_cast(a.ptr); -const auto* bptr = reinterpret_cast(b.ptr); -return std::lexicographical_compare(aptr, aptr + a.len, bptr, bptr + b.len); +template +struct BinaryLikeComparer { + static bool Compare(int type_length, const T& a, const T& b) { +int a_length = value_length(type_length, a); +int b_length = value_length(type_length, b); +// Unsigned comparison is used for non-numeric types so straight +// lexiographic comparison makes sense. (a.ptr is always unsigned) +return std::lexicographical_compare(a.ptr, a.ptr + a_length, b.ptr, b.ptr + b_length); } +}; - static T Min(int type_length, const T& a, const T& b) { -if (a.ptr == nullptr) return b; -if (b.ptr == nullptr) return a; -return Compare(type_length, a, b) ? a : b; - } +template +struct BinaryLikeComparer { + static bool Compare(int type_length, const T& a, const T& b) { +// Is signed is used for integers encoded as big-endian twos +// complement integers. (e.g. decimals). +int a_length = value_length(type_length, a); +int b_length = value_length(type_length, b); + +// At least of the lengths is zero. +if (a_length == 0 || b_length == 0) { + return a_length == 0 && b_length > 0; +} - static T Max(int type_length, const T& a, const T& b) { -if (a.ptr == nullptr) return b; -if (b.ptr == nullptr) return a; -return Compare(type_length, a, b) ? b : a; +int8_t first_a = *a.ptr; +int8_t first_b = *b.ptr; +// We can short circuit for different signed numbers or +// for equal length bytes arrays that have different first bytes. +if ((0x80 & first_a) != (0x80 & first_b) || +(a_length == b_length && first_a != first_b)) { + return first_a < first_b; +} Review comment: It is not entirely clear to me all potential use cases for this, so this is somewhat defensive. Decimals can be encoded either variable width or fixed width within a parquet file. Simply comparing the first bytes fails when sign extending values. For instance `0xFF80` and `0x80` are equal. This scenario seems like it could arise in two cases: 1. Trying to compare statistics in encoded form between a a fixed width encoded decimal column and a variable length encoded column (likely from two separate files). 2. The specification only says `The minimum number of bytes to store the unscaled value should be used.` since this isn't a "must" I could image some implementations doing something wacky here still be in compliance. The first scenario seems more likely but still potentially rare for this code to hit in practice but like I said I'm being defensive. If this reasoning seems correct to you, I can add a comment to this affect. I can also remove the code but I think that might lead to bugs down the road (I quickly checked the java implementation and it looks like they have a notion of sign extension also. A third option is we could still short-circuit if neither first byte corresponded to a sign extended value (I would need to think about it, but I think this would would work). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] eerhardt commented on pull request #9356: ARROW-11422: [C#] add decimal support
eerhardt commented on pull request #9356: URL: https://github.com/apache/arrow/pull/9356#issuecomment-788083728 Can you also update https://github.com/apache/arrow/blob/master/docs/source/status.rst To check `Decimal128` and `Decimal256` support for C#? Also, should be able to remove `Decimal` from the `Not Implemented` section of https://github.com/apache/arrow/blob/master/csharp/README.md This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] eerhardt commented on a change in pull request #9356: ARROW-11422: [C#] add decimal support
eerhardt commented on a change in pull request #9356: URL: https://github.com/apache/arrow/pull/9356#discussion_r584834740 ## File path: csharp/src/Apache.Arrow/DecimalUtility.cs ## @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Linq; +using System.Numerics; +using System.Runtime.InteropServices; + +namespace Apache.Arrow +{ +/// +/// This is semi-optimised best attempt at converting to / from decimal and the buffers +/// +internal static class DecimalUtility +{ +private static readonly BigInteger _maxDecimal = new BigInteger(decimal.MaxValue); +private static readonly BigInteger _minDecimal = new BigInteger(decimal.MinValue); +private static readonly ulong[] s_powersOfTen = +{ +1, 10, 100, 1000, 1, 10, 100, 1000, 1, 10, 100, 1000, +1, 10, 100, 1000, 1, 10, +100, 1000 +}; +private static int PowersOfTenLength => s_powersOfTen.Length - 1; + +public static decimal GetDecimal(in ArrowBuffer valueBuffer, int index, int scale, int byteWidth, +bool isUnsigned = false) +{ +int startIndex = index * byteWidth; +ReadOnlySpan value = valueBuffer.Span.Slice(startIndex, byteWidth); +BigInteger integerValue; + +#if NETCOREAPP +integerValue = new BigInteger(value); +#else +integerValue = new BigInteger(value.ToArray()); +#endif + +if (integerValue > _maxDecimal || integerValue < _minDecimal) +{ +BigInteger scaleBy = BigInteger.Pow(10, scale); +BigInteger integerPart = BigInteger.DivRem(integerValue, scaleBy, out BigInteger fractionalPart); +if (integerPart > _maxDecimal || integerPart < _minDecimal) // decimal overflow, not much we can do here - C# needs a BigDecimal +{ +throw new OverflowException("Value: " + integerPart + " too big or too small to be represented as a decimal"); +} +return (decimal)integerPart + DivideByScale(fractionalPart, scale); +} +else +{ +return DivideByScale(integerValue, scale); +} +} + +private static decimal DivideByScale(BigInteger integerValue, int scale) +{ +decimal result = (decimal)integerValue; // this cast is safe here +int drop = scale; +while (drop > PowersOfTenLength) +{ +result /= s_powersOfTen[PowersOfTenLength]; +drop -= PowersOfTenLength; +} + +result /= s_powersOfTen[drop]; +return result; +} + +public static void GetBytes(BigInteger integerValue, int byteWidth, ref Span bytes) +{ +if (bytes.Length != byteWidth) +{ +throw new OverflowException("ValueBuffer size not equal to " + byteWidth + " byte width: " + bytes.Length); +} + +Span integerBytes = integerValue.ToByteArray().AsSpan(); +if (integerBytes.Length > byteWidth) +{ +throw new OverflowException("Decimal size greater than " + byteWidth + " bytes: " + integerBytes.Length); +} + +if (integerBytes.Length == byteWidth) +{ +bytes = integerBytes; +return; +} + +if (integerValue.Sign == -1) +{ +integerBytes.CopyTo(bytes); +for (int i = integerBytes.Length; i < byteWidth; i++) +{ +bytes[i] = 255; +} +} +else +{ +integerBytes.CopyTo(bytes); +} +} + +public static bool CheckPrecisionAndScale(decimal value, int precision, int scale, out BigInteger integerValue) +{ +DecimalLayout layout = new
[GitHub] [arrow] alamb commented on a change in pull request #9600: ARROW-11822: [Rust][Datafusion] Support case sensitive for function
alamb commented on a change in pull request #9600: URL: https://github.com/apache/arrow/pull/9600#discussion_r584779892 ## File path: rust/datafusion/src/execution/context.rs ## @@ -589,6 +598,10 @@ impl ContextProvider for ExecutionContextState { fn get_aggregate_meta(, name: ) -> Option> { self.aggregate_functions.get(name).cloned() } + +fn get_config() -> ExecutionConfig { +self.config.clone() +} Review comment: I wonder if there is any reason to copy the config or if the code could return a reference (and the caller could `clone()` it if they needed to ```suggestion fn config() -> { self.config } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] alamb commented on a change in pull request #9600: ARROW-11822: [Rust][Datafusion] Support case sensitive for function
alamb commented on a change in pull request #9600: URL: https://github.com/apache/arrow/pull/9600#discussion_r584777400 ## File path: rust/datafusion/src/sql/planner.rs ## @@ -927,7 +931,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } SQLExpr::Function(function) => { -let name: String = function.name.to_string(); +let name; +let input_name = function.name.to_string(); + +let case_sensitive = self.schema_provider.get_config().case_sensitive; +if !case_sensitive { +name = input_name.to_lowercase(); +} else { +name = input_name.clone(); +} Review comment: I think the following formulation would be more idiomatic rust (and save a copy of `input_name`). ```suggestion let input_name = function.name.to_string(); let case_sensitive = self.schema_provider.get_config().case_sensitive; let name = if !case_sensitive { input_name.to_lowercase(); } else { input_name; } ``` ## File path: rust/datafusion/src/execution/context.rs ## @@ -502,6 +502,8 @@ pub struct ExecutionConfig { pub concurrency: usize, /// Default batch size when reading data sources pub batch_size: usize, +/// Case sensitive Review comment: ```suggestion /// Will function names be searched using case-sensitive matching. /// If `false` both `"SELECT COUNT(*) FROM t;` and "`SELECT count(*) FROM t;` /// can be used to compute the `COUNT` aggregate. If `true` then only /// `"SELECT count(*) FROM t"` can be used. /// Defaults to `true` ``` ## File path: rust/arrow-flight/src/arrow.flight.protocol.rs ## @@ -499,8 +499,9 @@ pub mod flight_service_server { #[async_trait] pub trait FlightService: Send + Sync + 'static { #[doc = "Server streaming response type for the Handshake method."] -type HandshakeStream: Stream> Review comment: I don't think the changes in this file are needed, are they? ## File path: rust/datafusion/src/execution/context.rs ## @@ -589,6 +598,10 @@ impl ContextProvider for ExecutionContextState { fn get_aggregate_meta(, name: ) -> Option> { self.aggregate_functions.get(name).cloned() } + +fn get_config() -> ExecutionConfig { +self.config.clone() +} Review comment: ```suggestion fn config() -> { self.config } ``` ## File path: rust/datafusion/src/sql/planner.rs ## @@ -343,7 +346,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } /// Generate a logic plan from an SQL select -fn select_to_plan(, select: ) -> Result { +pub fn select_to_plan(, select: ) -> Result { Review comment: Does this need to be made `pub`? ## File path: rust/datafusion/src/logical_plan/expr.rs ## @@ -160,20 +160,26 @@ pub enum Expr { }, /// Represents the call of a built-in scalar function with a set of arguments. ScalarFunction { +/// The input name of the function +input_name: String, Review comment: Also, I looked through the diff and I couldn't figure out what `input_name` is actually used for. Is this new field needed? ## File path: rust/datafusion/src/sql/planner.rs ## @@ -927,7 +931,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } SQLExpr::Function(function) => { -let name: String = function.name.to_string(); +let name; +let input_name = function.name.to_string(); + +let case_sensitive = self.schema_provider.get_config().case_sensitive; +if !case_sensitive { +name = input_name.to_lowercase(); +} else { +name = input_name.clone(); +} Review comment: This logic seems to implement case *IN*sensitive logic. Namely if `case_sensitive` is true, I would expect `count` != `COUNT` but this PR does the opposite. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] github-actions[bot] commented on pull request #9606: ARROW-10405: [C++] IsIn kernel should be able to lookup dictionary in string
github-actions[bot] commented on pull request #9606: URL: https://github.com/apache/arrow/pull/9606#issuecomment-788045846 https://issues.apache.org/jira/browse/ARROW-10405 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] rok opened a new pull request #9606: ARROW-10405: [C++] IsIn kernel should be able to lookup dictionary in string
rok opened a new pull request #9606: URL: https://github.com/apache/arrow/pull/9606 This is to resolve [ARROW-10405](https://issues.apache.org/jira/browse/ARROW-10405). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] jonkeane commented on a change in pull request #9591: ARROW-11729: [R] Add examples to the datasets documentation
jonkeane commented on a change in pull request #9591: URL: https://github.com/apache/arrow/pull/9591#discussion_r584794792 ## File path: r/R/dataset-write.R ## @@ -53,6 +53,17 @@ #' - `codec`: A [Codec] which will be used to compress body buffers of written #' files. Default (NULL) will not compress body buffers. #' @return The input `dataset`, invisibly +#' @examples +#' \donttest{ +#' # we can group by cyl, cyl and gear or even more variables +#' write_dataset(mtcars, tempdir(), "feather", partitioning = "cyl")) Review comment: https://issues.apache.org/jira/browse/ARROW-11582 is relevant here (we should make that error message nicer, and possibly default to parquet if we can without hurting anything else / needing to do a scan of the data / etc) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] pachamaltese commented on a change in pull request #9591: ARROW-11729: [R] Add examples to the datasets documentation
pachamaltese commented on a change in pull request #9591: URL: https://github.com/apache/arrow/pull/9591#discussion_r584784120 ## File path: r/R/dataset-write.R ## @@ -53,6 +53,17 @@ #' - `codec`: A [Codec] which will be used to compress body buffers of written #' files. Default (NULL) will not compress body buffers. #' @return The input `dataset`, invisibly +#' @examples +#' \donttest{ +#' # we can group by cyl, cyl and gear or even more variables +#' write_dataset(mtcars, tempdir(), "feather", partitioning = "cyl")) Review comment: sure, I' m using "parquet", because if I leave it blank I see ``` Error in if (format %in% c("csv", "text") || any(opt_names %in% c("delim", : missing value where TRUE/FALSE needed ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] bkietz commented on a change in pull request #9589: ARROW-11797: [C++][Dataset] Provide batch stream Scanner methods
bkietz commented on a change in pull request #9589: URL: https://github.com/apache/arrow/pull/9589#discussion_r584779485 ## File path: cpp/src/arrow/dataset/scanner.h ## @@ -163,12 +164,20 @@ class ARROW_DS_EXPORT Scanner { /// in a concurrent fashion and outlive the iterator. Result Scan(); + /// \brief Apply a visitor to each RecordBatch as it is scanned. If multiple + /// threads are used, the visitor will be invoked from those threads and is + /// responsible for any synchronization. + Status Scan(std::function)> visitor); Review comment: The difference is `ToBatches().Visit(visitor)` would invoke `visitor` exclusively on the thread which called `Visit()` whereas `Scan(visitor)` invokes `visitor` in the scan's thread pool. This method is speculative; I'm not sure we'd want to provide that but I included it as an example This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] bkietz commented on a change in pull request #9589: ARROW-11797: [C++][Dataset] Provide batch stream Scanner methods
bkietz commented on a change in pull request #9589: URL: https://github.com/apache/arrow/pull/9589#discussion_r584777161 ## File path: cpp/src/arrow/dataset/scanner.cc ## @@ -224,5 +225,100 @@ Result> Scanner::ToTable() { FlattenRecordBatchVector(std::move(state->batches))); } +struct ToBatchesState { + explicit ToBatchesState(size_t n_tasks) + : batches(n_tasks), task_drained(n_tasks, false) {} + + /// Protecting mutating accesses to batches + std::mutex mutex; + std::vector>> batches; + std::vector task_drained; + size_t pop_cursor = 0; + + void Push(std::shared_ptr b, size_t i_task) { +std::lock_guard lock(mutex); +if (batches.size() <= i_task) { + batches.resize(i_task + 1); + task_drained.resize(i_task + 1); +} +batches[i_task].push_back(std::move(b)); + } + + Status Finish(size_t position) { +std::lock_guard lock(mutex); +task_drained[position] = true; +return Status::OK(); + } + + std::shared_ptr Pop() { +std::unique_lock lock(mutex); +std::condition_variable().wait_for(lock, std::chrono::milliseconds{1}, [this] { Review comment: This is indeed an (embarrassing) error. The intent was to throttle checking the variable to 1ms intervals but I forgot to write the enclosing loop. I'll replace with a simple call to `wait()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] cyb70289 commented on a change in pull request #9604: ARROW-11567: [C++][Compute] Improve variance kernel precision
cyb70289 commented on a change in pull request #9604: URL: https://github.com/apache/arrow/pull/9604#discussion_r584741353 ## File path: cpp/src/arrow/compute/kernels/aggregate_test.cc ## @@ -1205,6 +1205,21 @@ TEST_F(TestVarStdKernelMergeStability, Basics) { #endif } +// Test round-off error +class TestVarStdKernelRoundOff : public TestPrimitiveVarStdKernel {}; + +TEST_F(TestVarStdKernelRoundOff, Basics) { + // build array: np.arange(321000, dtype='float64') + double value = 0; + ASSERT_OK_AND_ASSIGN( + auto array, ArrayFromBuilderVisitor(float64(), 321000, [&](DoubleBuilder* builder) { +builder->UnsafeAppend(value++); + })); + + // reference value from numpy.var() + this->AssertVarStdIs(*array, VarianceOptions{0}, 858674.916667); +} Review comment: Looks not necessary, as internally `float32` is converted to `double` before calculation, and there's no `float32` summation. Will do some tests against numpy to see if additional test is required. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] cyb70289 commented on a change in pull request #9604: ARROW-11567: [C++][Compute] Improve variance kernel precision
cyb70289 commented on a change in pull request #9604: URL: https://github.com/apache/arrow/pull/9604#discussion_r584732087 ## File path: cpp/src/arrow/compute/kernels/aggregate_var_std.cc ## @@ -30,6 +30,96 @@ namespace internal { namespace { using arrow::internal::int128_t; +using arrow::internal::VisitSetBitRunsVoid; + +// non-recursive pairwise summation for floating points +// https://en.wikipedia.org/wiki/Pairwise_summation +template +enable_if_t::value, SumType> SumArray( +const ArrayData& data, ValueFunc&& func) { + const int64_t data_size = data.length - data.GetNullCount(); + if (data_size == 0) { +return 0; + } + + // number of inputs to accumulate before merging with another block + constexpr int kBlockSize = 16; // same as numpy + // levels (tree depth) = ceil(log2(len)) + 1, a bit larger than necessary + const int levels = BitUtil::Log2(static_cast(data_size)) + 1; + // temporary summation per level + std::vector sum(levels); + // whether two summations are ready and should be reduced to upper level + // one bit for each level, bit0 -> level0, ... + uint64_t mask = 0; + // level of root node holding the final summation + int root_level = 0; + + // reduce summation of one block (may be smaller than kBlockSize) from leaf node + // continue reducing to upper level if two summations are ready for non-leaf node + auto reduce = [&](SumType block_sum) { +int cur_level = 0; +uint64_t cur_level_mask = 1ULL; +sum[cur_level] += block_sum; +mask ^= cur_level_mask; +while ((mask & cur_level_mask) == 0) { + block_sum = sum[cur_level]; + sum[cur_level] = 0; + ++cur_level; + DCHECK_LT(cur_level, levels); + cur_level_mask <<= 1; + sum[cur_level] += block_sum; + mask ^= cur_level_mask; +} +root_level = std::max(root_level, cur_level); + }; + + const ValueType* values = data.GetValues(1); + VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length, + [&](int64_t pos, int64_t len) { +const ValueType* v = [pos]; +const int64_t blocks = len / kBlockSize; +const int64_t remains = len % kBlockSize; Review comment: Ah yes, missed this point, will change to shift/mask. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] JkSelf commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
JkSelf commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r584729112 ## File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java ## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; +import org.apache.commons.compress.utils.IOUtils; + +import io.netty.util.internal.PlatformDependent; + +/** + * Compression codec for the LZ4 algorithm. + */ +public class Lz4CompressionCodec implements CompressionCodec { + + @Override + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { +Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The uncompressed buffer size exceeds the integer limit"); + +if (uncompressedBuffer.writerIndex() == 0L) { + // shortcut for empty buffer + ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + compressedBuffer.setLong(0, 0); + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + uncompressedBuffer.close(); + return compressedBuffer; +} + +try { + ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer); + long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH; + if (compressedLength > uncompressedBuffer.writerIndex()) { +// compressed buffer is larger, send the raw buffer +compressedBuffer.close(); +compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer); + } + + uncompressedBuffer.close(); + return compressedBuffer; +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException { +byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()]; +PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex()); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +try (InputStream in = new ByteArrayInputStream(inBytes); + OutputStream out = new FramedLZ4CompressorOutputStream(baos)) { + IOUtils.copy(in, out); +} + +byte[] outBytes = baos.toByteArray(); + +ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + +long uncompressedLength = uncompressedBuffer.writerIndex(); +if (!MemoryUtil.LITTLE_ENDIAN) { + uncompressedLength = Long.reverseBytes(uncompressedLength); +} +// first 8 bytes reserved for uncompressed length, to be consistent with the +// C++ implementation. +compressedBuffer.setLong(0, uncompressedLength); + +PlatformDependent.copyMemory( +outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length); +compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); +return compressedBuffer; + } + + @Override + public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { +Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The compressed buffer size exceeds the integer limit"); + +Preconditions.checkArgument(compressedBuffer.writerIndex() >=
[GitHub] [arrow] pitrou commented on a change in pull request #9604: ARROW-11567: [C++][Compute] Improve variance kernel precision
pitrou commented on a change in pull request #9604: URL: https://github.com/apache/arrow/pull/9604#discussion_r584720949 ## File path: cpp/src/arrow/compute/kernels/aggregate_var_std.cc ## @@ -30,6 +30,96 @@ namespace internal { namespace { using arrow::internal::int128_t; +using arrow::internal::VisitSetBitRunsVoid; + +// non-recursive pairwise summation for floating points +// https://en.wikipedia.org/wiki/Pairwise_summation +template +enable_if_t::value, SumType> SumArray( +const ArrayData& data, ValueFunc&& func) { + const int64_t data_size = data.length - data.GetNullCount(); + if (data_size == 0) { +return 0; + } + + // number of inputs to accumulate before merging with another block + constexpr int kBlockSize = 16; // same as numpy + // levels (tree depth) = ceil(log2(len)) + 1, a bit larger than necessary + const int levels = BitUtil::Log2(static_cast(data_size)) + 1; + // temporary summation per level + std::vector sum(levels); + // whether two summations are ready and should be reduced to upper level + // one bit for each level, bit0 -> level0, ... + uint64_t mask = 0; + // level of root node holding the final summation + int root_level = 0; + + // reduce summation of one block (may be smaller than kBlockSize) from leaf node + // continue reducing to upper level if two summations are ready for non-leaf node + auto reduce = [&](SumType block_sum) { +int cur_level = 0; +uint64_t cur_level_mask = 1ULL; +sum[cur_level] += block_sum; +mask ^= cur_level_mask; +while ((mask & cur_level_mask) == 0) { + block_sum = sum[cur_level]; + sum[cur_level] = 0; + ++cur_level; + DCHECK_LT(cur_level, levels); + cur_level_mask <<= 1; + sum[cur_level] += block_sum; + mask ^= cur_level_mask; +} +root_level = std::max(root_level, cur_level); + }; + + const ValueType* values = data.GetValues(1); + VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length, + [&](int64_t pos, int64_t len) { +const ValueType* v = [pos]; +const int64_t blocks = len / kBlockSize; +const int64_t remains = len % kBlockSize; Review comment: If you're interested in performance, note that signed division and modulo are more complicated than a simple shift or mask: https://godbolt.org/z/MTz7eM ## File path: cpp/src/arrow/compute/kernels/aggregate_test.cc ## @@ -1205,6 +1205,21 @@ TEST_F(TestVarStdKernelMergeStability, Basics) { #endif } +// Test round-off error +class TestVarStdKernelRoundOff : public TestPrimitiveVarStdKernel {}; + +TEST_F(TestVarStdKernelRoundOff, Basics) { + // build array: np.arange(321000, dtype='float64') + double value = 0; + ASSERT_OK_AND_ASSIGN( + auto array, ArrayFromBuilderVisitor(float64(), 321000, [&](DoubleBuilder* builder) { +builder->UnsafeAppend(value++); + })); + + // reference value from numpy.var() + this->AssertVarStdIs(*array, VarianceOptions{0}, 858674.916667); +} Review comment: Should we have a similar test for `float32` as well? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] pitrou commented on a change in pull request #9582: PARQUET-1655: [C++] Fix comparison of Decimal values in statistics
pitrou commented on a change in pull request #9582: URL: https://github.com/apache/arrow/pull/9582#discussion_r584709632 ## File path: cpp/src/parquet/statistics.cc ## @@ -133,56 +136,95 @@ struct CompareHelper { static T Max(int type_length, const T& a, const T& b) { return Compare(0, a, b) ? b : a; } -}; // namespace parquet - -template -struct ByteLikeCompareHelperBase { - using T = ByteArrayType::c_type; - using PtrType = typename std::conditional::type; +}; - static T DefaultMin() { return {}; } - static T DefaultMax() { return {}; } - static T Coalesce(T val, T fallback) { return val; } +template +struct BinaryLikeComparer {}; - static inline bool Compare(int type_length, const T& a, const T& b) { -const auto* aptr = reinterpret_cast(a.ptr); -const auto* bptr = reinterpret_cast(b.ptr); -return std::lexicographical_compare(aptr, aptr + a.len, bptr, bptr + b.len); +template +struct BinaryLikeComparer { + static bool Compare(int type_length, const T& a, const T& b) { +int a_length = value_length(type_length, a); +int b_length = value_length(type_length, b); +// Unsigned comparison is used for non-numeric types so straight +// lexiographic comparison makes sense. (a.ptr is always unsigned) +return std::lexicographical_compare(a.ptr, a.ptr + a_length, b.ptr, b.ptr + b_length); } +}; - static T Min(int type_length, const T& a, const T& b) { -if (a.ptr == nullptr) return b; -if (b.ptr == nullptr) return a; -return Compare(type_length, a, b) ? a : b; - } +template +struct BinaryLikeComparer { + static bool Compare(int type_length, const T& a, const T& b) { +// Is signed is used for integers encoded as big-endian twos +// complement integers. (e.g. decimals). +int a_length = value_length(type_length, a); +int b_length = value_length(type_length, b); + +// At least of the lengths is zero. +if (a_length == 0 || b_length == 0) { + return a_length == 0 && b_length > 0; +} - static T Max(int type_length, const T& a, const T& b) { -if (a.ptr == nullptr) return b; -if (b.ptr == nullptr) return a; -return Compare(type_length, a, b) ? b : a; +int8_t first_a = *a.ptr; +int8_t first_b = *b.ptr; +// We can short circuit for different signed numbers or +// for equal length bytes arrays that have different first bytes. +if ((0x80 & first_a) != (0x80 & first_b) || +(a_length == b_length && first_a != first_b)) { + return first_a < first_b; +} +// When the lengths are unequal and the numbers are of the same +// sign we need to extend the digits. Review comment: I'm not sure why all this is needed. The first bytes are equal. Regardless of the sign, wouldn't it be sufficient to write e.g.: ```c++ return std::lexicographical_compare(a.ptr + 1, a.ptr + a_length, b.ptr + 1, b.ptr + b_length); ``` i.e. only the MSB in a two's complement integer needs special handling, the other bytes are ordered naturally. ## File path: cpp/src/parquet/statistics.cc ## @@ -133,56 +136,95 @@ struct CompareHelper { static T Max(int type_length, const T& a, const T& b) { return Compare(0, a, b) ? b : a; } -}; // namespace parquet - -template -struct ByteLikeCompareHelperBase { - using T = ByteArrayType::c_type; - using PtrType = typename std::conditional::type; +}; - static T DefaultMin() { return {}; } - static T DefaultMax() { return {}; } - static T Coalesce(T val, T fallback) { return val; } +template +struct BinaryLikeComparer {}; - static inline bool Compare(int type_length, const T& a, const T& b) { -const auto* aptr = reinterpret_cast(a.ptr); -const auto* bptr = reinterpret_cast(b.ptr); -return std::lexicographical_compare(aptr, aptr + a.len, bptr, bptr + b.len); +template +struct BinaryLikeComparer { + static bool Compare(int type_length, const T& a, const T& b) { +int a_length = value_length(type_length, a); +int b_length = value_length(type_length, b); +// Unsigned comparison is used for non-numeric types so straight +// lexiographic comparison makes sense. (a.ptr is always unsigned) +return std::lexicographical_compare(a.ptr, a.ptr + a_length, b.ptr, b.ptr + b_length); } +}; - static T Min(int type_length, const T& a, const T& b) { -if (a.ptr == nullptr) return b; -if (b.ptr == nullptr) return a; -return Compare(type_length, a, b) ? a : b; - } +template +struct BinaryLikeComparer { + static bool Compare(int type_length, const T& a, const T& b) { +// Is signed is used for integers encoded as big-endian twos +// complement integers. (e.g. decimals). +int a_length = value_length(type_length, a); +int b_length = value_length(type_length, b); + +// At least of the lengths is zero. +if (a_length == 0 || b_length == 0) { + return a_length == 0 && b_length > 0; +} - static T Max(int
[GitHub] [arrow] pitrou commented on pull request #9590: ARROW-11801: [C++] Remove bad header guard in filesystem/type_fwd.h
pitrou commented on pull request #9590: URL: https://github.com/apache/arrow/pull/9590#issuecomment-787943446 Thank you @ianmcook ! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] pitrou closed pull request #9590: ARROW-11801: [C++] Remove bad header guard in filesystem/type_fwd.h
pitrou closed pull request #9590: URL: https://github.com/apache/arrow/pull/9590 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] pitrou closed pull request #9587: ARROW-11798: [Integration] Update testing submodule
pitrou closed pull request #9587: URL: https://github.com/apache/arrow/pull/9587 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] pitrou commented on pull request #9587: ARROW-11798: [Integration] Update testing submodule
pitrou commented on pull request #9587: URL: https://github.com/apache/arrow/pull/9587#issuecomment-787941310 Thanks @nevi-me . I tested locally and the integration tests seem to pass. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] alamb commented on pull request #9523: ARROW-11687: [Rust][DataFusion] RepartitionExec Hanging
alamb commented on pull request #9523: URL: https://github.com/apache/arrow/pull/9523#issuecomment-787926067 Here is a proposed PR for removing crossbeam: https://github.com/apache/arrow/pull/9605 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] alamb commented on a change in pull request #9605: ARROW-11802: [Rust][DataFusion] Remove use of crossbeam channels to avoid potential deadlocks
alamb commented on a change in pull request #9605: URL: https://github.com/apache/arrow/pull/9605#discussion_r584686515 ## File path: rust/datafusion/src/physical_plan/repartition.rs ## @@ -48,9 +51,13 @@ pub struct RepartitionExec { input: Arc, /// Partitioning scheme to use partitioning: Partitioning, -/// Channels for sending batches from input partitions to output partitions -/// there is one entry in this Vec for each output partition -channels: Arc, Receiver)>>>, +/// Channels for sending batches from input partitions to output partitions. +/// Key is the partition number +channels: Arc< Review comment: Note @Dandandan asked about using a `HashMap` vs some other structure. It is a `HashMap` for reasons explained by @edrevo here: https://github.com/edrevo/arrow/commit/97c256c4f76b8185311f36a7b27e317588904a3a#r47626396 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] alamb commented on a change in pull request #9605: ARROW-11802: [Rust][DataFusion] Remove use of crossbeam channels to avoid potential deadlocks
alamb commented on a change in pull request #9605: URL: https://github.com/apache/arrow/pull/9605#discussion_r584685022 ## File path: rust/datafusion/src/physical_plan/repartition.rs ## @@ -199,14 +211,12 @@ impl ExecutionPlan for RepartitionExec { } // notify each output partition that this input partition has no more data -for channel in channels.iter_mut().take(num_output_partitions) { -let tx = channel.0; +for (_, tx) in txs { tx.send(None) .map_err(|e| DataFusionError::Execution(e.to_string()))?; } Ok(()) }); -tokio::task::yield_now().await; Review comment: this is the workaround added in https://github.com/apache/arrow/pull/9580 and it is now removed in favor of what we think is the fix of the root cause This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] github-actions[bot] commented on pull request #9605: ARROW-11802: [Rust][DataFusion] Remove use of crossbeam channels to avoid potential deadlocks
github-actions[bot] commented on pull request #9605: URL: https://github.com/apache/arrow/pull/9605#issuecomment-787923096 https://issues.apache.org/jira/browse/ARROW-11802 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] alamb opened a new pull request #9605: ARROW-11802: [Rust][DataFusion] Remove use of crossbeam channels to avoid potential deadlocks
alamb opened a new pull request #9605: URL: https://github.com/apache/arrow/pull/9605 # Rationale As spotted / articulated by @edrevo https://github.com/apache/arrow/pull/9523#issuecomment-786911328, the intermixing of `crossbeam` channels (not designed for `async` and can block task threads) and `async` code such as DataFusion can lead to deadlock. At least one of the crossbeam uses predates DataFusion being async (e.g. the one in the parquet reader). The use of crossbeam in the repartition operator in #8982 may have resulted from the re-use of the same pattern. # Changes 1. Removes the use of crossbeam channels from DataFusion (in `RepartitionExec` and `ParquetExec`) and replace with tokio channels (which are designed for single threaded code). 2. Removes `crossbeam` dependency entirely 3. Removes use of `multi_thread`ed executor in tests (e.g. `#[tokio::test(flavor = "multi_thread")]`) which can mask hangs # Kudos / Thanks This PR incorporates the work of @seddonm1 from https://github.com/apache/arrow/pull/9603 and @edrevo in https://github.com/edrevo/arrow/tree/remove-crossbeam (namely 97c256c4f76b8185311f36a7b27e317588904a3a). A big thanks to both of them for their help in this endeavor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] romainfrancois commented on pull request #8650: ARROW-10570: [R] Use Converter API to convert SEXP to Array/ChunkedArray
romainfrancois commented on pull request #8650: URL: https://github.com/apache/arrow/pull/8650#issuecomment-787890349 I've put a fix in place, at least momentarily This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] alamb commented on pull request #9523: ARROW-11687: [Rust][DataFusion] RepartitionExec Hanging
alamb commented on pull request #9523: URL: https://github.com/apache/arrow/pull/9523#issuecomment-787878542 PR is https://github.com/apache/arrow/pull/9603 for anyone else following along This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] alamb commented on a change in pull request #9603: ARROW-11687: [Rust][DataFusion] RepartitionExec Hanging Test
alamb commented on a change in pull request #9603: URL: https://github.com/apache/arrow/pull/9603#discussion_r584636939 ## File path: rust/datafusion/src/physical_plan/repartition.rs ## @@ -415,4 +415,32 @@ mod tests { } Ok(output_partitions) } + +#[tokio::test(flavor = "multi_thread")] Review comment: Thanks @seddonm1 -- as you say this test hangs when I comment out the call to ``` tokio::task::yield_now().await; ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] alamb commented on a change in pull request #9603: ARROW-11687: [Rust][DataFusion] RepartitionExec Hanging Test
alamb commented on a change in pull request #9603: URL: https://github.com/apache/arrow/pull/9603#discussion_r584636939 ## File path: rust/datafusion/src/physical_plan/repartition.rs ## @@ -415,4 +415,32 @@ mod tests { } Ok(output_partitions) } + +#[tokio::test(flavor = "multi_thread")] Review comment: Thanks @seddonm1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] alamb commented on pull request #9572: ARROW-11779: [Rust] make alloc module public
alamb commented on pull request #9572: URL: https://github.com/apache/arrow/pull/9572#issuecomment-787871070 > I do think that an AlignedVec in Arrow could speed up some code and make some internal code easier to maintain. If you want I could clean it up and propose a PR. @ritchie46 I would enjoy helping review that code as well as `ChunkedAray` I spent some time reviewing [`ChunkedArray])(https://github.com/ritchie46/polars/blob/6f2dfd43dbf7ab44cbd2904af57d3441a1b52853/polars/polars-core/src/chunked_array/mod.rs#L141-L157) and it looks very useful for something in IOx (the ingest path). Thank you for the pointers (and the well commented code) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] alamb commented on pull request #9601: ARROW-11825: [Rust][DataFusion] Add mimalloc as option to benchmarks
alamb commented on pull request #9601: URL: https://github.com/apache/arrow/pull/9601#issuecomment-787865359 Thanks @Dandandan This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org