Ooh, [2] was very interesting to read.

I am also adding parameters to combine a specified number of chunks
together (also uses combineChunks) and invoke the function on a specified
limit of columns. If desirable, I can add some of these descriptions and
such to the JIRA and maybe I can use it as an anchor to dig more into the
backing mechanisms.

I'll also move this discussion to a github issue or dev@, I forgot that I
asked the initial questions on user@.

Thanks for taking the time to look at the repo and for the response!

Aldrin Montana
Computer Science PhD Student
UC Santa Cruz


On Wed, Mar 23, 2022 at 2:59 PM Weston Pace <[email protected]> wrote:

> Thank you for providing such an interesting (and enlightening)
> reproduction.  In both cases you are calling the compute functions exactly
> the same.  For example, you are adding two chunked arrays in both cases.
> The key difference appears to be that you are either making <1 call to
> `Add` with a chunked array with 565 chunks> or <565 calls to `Add` with a
> chunked array with 1 chunk>.
>
> One could theorize that the underlying code is somehow allocating or
> iterating more efficiently when given a large batch of input data.
> However, running through a profiler, that does not appear to be the case.
>
> Instead I think the C++ implementation is simply inefficient at the moment
> and our function overhead is too high.  The cost is spread out very
> diffusely throughout the entire compute module.  I have created [1] to get
> a better grasp on this.  The details of the fixes can be left for JIRA
> and/or the dev@ mailing list but reducing this overhead is pretty
> important for my current work so I hope to be able to look into it.  I'll
> also add that this is a pretty similar concern to [2] which Wes raised last
> summer.
>
> -Weston
>
> [1] https://issues.apache.org/jira/browse/ARROW-16014
> [2] https://lists.apache.org/thread/mp68ofm2hnvs2v2oz276rvw7y5kwqoyd
>
> On Sun, Mar 20, 2022 at 9:20 PM Aldrin <[email protected]> wrote:
>
>> Sorry it took awhile, but here is a repository I put together that should
>> reproducibly illustrate what I am seeing, and what I'd like to understand
>> better (if not improve)[1].
>>
>> The linked source code [2] shows 2 places where I am collecting times
>> (using std::chrono::steady_clock in C++). The unexpected behavior is
>> mentioned at the end of the README.md [3]:
>>
>> Applying a function on a Table: "Executed in 4.08 secs"
>> Applying the function on each "table chunk": "Executed in 58.67 secs"
>>
>> For the test data I included in this repository, the Table has the
>> following properties:
>> * 27118 rows
>> * 2152 columns
>> * 565 "table chunks"
>> * 48 rows per "table chunk"
>>
>> Every column of the table should be identically chunked, so a "table
>> chunk" just refers to the same chunk of every column in the table. I call
>> it a "table chunk" in this email because I'm not sure if it is the same as
>> a RecordBatch or if RecordBatch lays the columns out differently. The
>> reason each column is chunked identically is so that the whole table can be
>> split across many key-values (written to a KeyValue store). In the code I
>> provide in the repository, the table is decomposed via: Table ->
>> TableReader -> RecordBatch -> Table::FromRecordBatches (vector of 1
>> RecordBatch) [4]. I have found that applying functions to Tables and to
>> ChunkedArrays is more ergonomic than RecordBatches and Arrays (at least as
>> of ~5.0.0).
>>
>> So, what I want to understand is what contributes to the difference in
>> performance?
>>
>> Thanks for any help or advice you can give!
>>
>> P.S. I will try to also add tensors to this for my own curiosity.
>>
>>
>> [1]: https://github.com/drin/arrow-partial-aggr
>> [2]:
>> https://github.com/drin/arrow-partial-aggr/blob/mainline/src/simple.cpp#L108
>> [3]: https://github.com/drin/arrow-partial-aggr/blob/mainline/README.md
>> [4]:
>> https://github.com/drin/arrow-partial-aggr/blob/mainline/src/simple.cpp#L123
>>
>> Aldrin Montana
>> Computer Science PhD Student
>> UC Santa Cruz
>>
>>
>> On Fri, Mar 11, 2022 at 6:44 AM Niranda Perera <[email protected]>
>> wrote:
>>
>>> Sore, I think I've missed the smart pointers in my response. It *should*
>>> be smart pointers, otherwise you'll lose the allocation when you go out of
>>> context. It should have been,
>>> class MeanAggr{
>>> int64_t count_;
>>> vector<shared_ptr<Array>> sums_;
>>> vector<shared_ptr<Array>> sum_squares_;
>>> }
>>>
>>> On Fri, Mar 11, 2022 at 3:16 AM Aldrin <[email protected]> wrote:
>>>
>>>> Actually, I think I understand now; I misread "extending the class
>>>> members". But I think the point got across--if I know my table has a single
>>>> chunk, then I can do the operations on the arrays and then I can wrap the
>>>> result in a ChunkedArray or Table. For each slice, I can just maintain the
>>>> results in a vector without smart pointers.
>>>>
>>>> I'll definitely try this. Thanks!
>>>>
>>>> Aldrin Montana
>>>> Computer Science PhD Student
>>>> UC Santa Cruz
>>>>
>>>>
>>>> On Thu, Mar 10, 2022 at 11:35 PM Aldrin <[email protected]> wrote:
>>>>
>>>>> I think there's one minor misunderstanding, but I like the essence of
>>>>> the feedback.
>>>>>
>>>>> To clarify, the MeanAggr::Accumulate function is used to gather over
>>>>> points of a sample, where a row is considered a sample, and columns are
>>>>> corresponding values, e.g.:
>>>>>
>>>>> columns (values) |  c0  |  c1  |  c2 |  c3 |   c4
>>>>> row 0 (sample 0) |   1  |   2  |   3 |   4 |     5
>>>>> row 1 (sample 1) |   1  |   4  |  27 | 256 |  3125
>>>>>
>>>>> For this tiny example, applying Accumulate "by slice" means that I
>>>>> apply it once on row 0, then again on row 1, and I add the times together.
>>>>> "By Table" means that I concatenate row 0 and row 1, then apply Accumulate
>>>>> on the resulting table. Combine isn't currently being considered (it's for
>>>>> when I split on columns). You can sort of see this in [1], but it also
>>>>> illustrates sequential calls of Accumulate instead of using Combine. I 
>>>>> will
>>>>> explain this more in a reproducible example.
>>>>>
>>>>> Given the clarification, I am not sure if the suggested local
>>>>> calculations are helpful, but maybe you mean I shouldn't use so many 
>>>>> shared
>>>>> pointers? Although, I do think I'll try reducing the code path by using
>>>>> Arrays when I'm applying to a Table that I know has only 1 chunk (because 
>>>>> I
>>>>> have specified it that way). This seems like it should help isolate some 
>>>>> of
>>>>> the overhead.
>>>>>
>>>>> Thanks for the feedback!
>>>>>
>>>>> [1]:
>>>>> https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/fb688531169421a5b5985d2cbfee100e793cae2f/resources/assets/TStatistic_Diagram.png
>>>>>
>>>>> Aldrin Montana
>>>>> Computer Science PhD Student
>>>>> UC Santa Cruz
>>>>>
>>>>>
>>>>> On Thu, Mar 10, 2022 at 7:49 PM Niranda Perera <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Okay, one thing I immediately see is that there are a lot of memory
>>>>>> allocations/ deallocations happening in the approach you have given IMO.
>>>>>> arrow::compute methods are immutable, so when you get an answer, it would
>>>>>> be allocated freshly in memory, and when you update an existing 
>>>>>> shared_ptr,
>>>>>> you would be deallocating the previous buffers. In both, 
>>>>>> MeanAggr::Combine
>>>>>> and MeanAggr::Accumulate this is happening and this could be a reason why
>>>>>> the splitted version is slower. Single table version only has to go 
>>>>>> through
>>>>>> MeanAggr::Accumulate.
>>>>>>
>>>>>> If I may suggest an alternative approach, I'd do this for variance
>>>>>> calculation,
>>>>>> class MeanAggr{
>>>>>> int64_t count_;
>>>>>> vector<Array> sums_;
>>>>>> vector<Array> sum_squares_;
>>>>>> }
>>>>>>
>>>>>> At every Accumulate, I will calculate local sums, sum squares, and
>>>>>> extend the class members with the resultant ChunkArray's chunks (which 
>>>>>> are
>>>>>> Arrays).
>>>>>> At the end, I'll create some ChunkArrays from these vectors, and use
>>>>>> E(x^2)-E(x)^2 to calculate the variance. I feel like this might reduce 
>>>>>> the
>>>>>> number of extra allocs and deallocs.
>>>>>>
>>>>>> On Thu, Mar 10, 2022 at 9:47 PM Aldrin <[email protected]> wrote:
>>>>>>
>>>>>>> You're correct with the first clarification. I am not (currently)
>>>>>>> slicing column-wise.
>>>>>>>
>>>>>>> And yes, I am calculating variance, mean, etc. so that I can
>>>>>>> calculate the t-statistic.
>>>>>>>
>>>>>>> Aldrin Montana
>>>>>>> Computer Science PhD Student
>>>>>>> UC Santa Cruz
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Mar 10, 2022 at 5:16 PM Niranda Perera <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Or are you slicing column-wise?
>>>>>>>>
>>>>>>>> On Thu, Mar 10, 2022 at 8:14 PM Niranda Perera <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> From the looks of it, you are trying to calculate variance, mean,
>>>>>>>>> etc over rows, isn't it?
>>>>>>>>>
>>>>>>>>> I need to clarify a bit on this statement.
>>>>>>>>> "Where "by slice" is total time, summed from running the function
>>>>>>>>> on each slice and "by table" is the time of just running the function 
>>>>>>>>> on
>>>>>>>>> the table concatenated from each slice."
>>>>>>>>> So, I assume you are originally using a `vector<shared_ptr<Table>>
>>>>>>>>> slices`. For the former case, you are passing each slice to
>>>>>>>>> `MeanAggr::Accumulate`, and for the latter case, you are calling
>>>>>>>>> arrow::Concatenate(slices) and passing the result as a single table?
>>>>>>>>>
>>>>>>>>> On Thu, Mar 10, 2022 at 7:41 PM Aldrin <[email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Oh, but the short answer is that I'm using: Add, Subtract,
>>>>>>>>>> Divide, Multiply, Power, and Absolute. Sometimes with both inputs 
>>>>>>>>>> being
>>>>>>>>>> ChunkedArrays, sometimes with 1 input being a ChunkedArray and the 
>>>>>>>>>> other
>>>>>>>>>> being a scalar.
>>>>>>>>>>
>>>>>>>>>> Aldrin Montana
>>>>>>>>>> Computer Science PhD Student
>>>>>>>>>> UC Santa Cruz
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 10, 2022 at 4:38 PM Aldrin <[email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Niranda!
>>>>>>>>>>>
>>>>>>>>>>> Sure thing, I've linked to my code. [1] is essentially the
>>>>>>>>>>> function being called, and [2] is an example of a wrapper function 
>>>>>>>>>>> (more in
>>>>>>>>>>> that file) I wrote to reduce boilerplate (to make [1] more 
>>>>>>>>>>> readable). But,
>>>>>>>>>>> now that I look at [2] again, which I wrote before I really knew 
>>>>>>>>>>> much about
>>>>>>>>>>> smart pointers, I wonder if some of what I benchmarked is overhead 
>>>>>>>>>>> from
>>>>>>>>>>> misusing C++ structures?
>>>>>>>>>>>
>>>>>>>>>>> Thanks!
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> [1]:
>>>>>>>>>>> https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/58839eb921c53d17ac32129be6af214ae4b58a13/src/cpp/processing/statops.cpp#L96
>>>>>>>>>>> [2]:
>>>>>>>>>>> https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/58839eb921c53d17ac32129be6af214ae4b58a13/src/cpp/processing/numops.cpp#L18
>>>>>>>>>>>
>>>>>>>>>>> Aldrin Montana
>>>>>>>>>>> Computer Science PhD Student
>>>>>>>>>>> UC Santa Cruz
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Mar 10, 2022 at 4:30 PM Niranda Perera <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Aldrin,
>>>>>>>>>>>>
>>>>>>>>>>>> It would be helpful to know what sort of compute operators you
>>>>>>>>>>>> are using.
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Mar 10, 2022, 19:12 Aldrin <[email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I will work on a reproducible example.
>>>>>>>>>>>>>
>>>>>>>>>>>>> As a sneak peek, what I was seeing was the following (pasted
>>>>>>>>>>>>> in gmail, see [1] for markdown version):
>>>>>>>>>>>>>
>>>>>>>>>>>>> Table ID Columns Rows Rows (slice) Slice count Time (ms)
>>>>>>>>>>>>> total; by slice Time (ms)
>>>>>>>>>>>>> total; by table
>>>>>>>>>>>>> E-GEOD-100618 415 20631 299 69 644.065 410
>>>>>>>>>>>>> E-GEOD-76312 2152 27120 48 565 25607.927 2953
>>>>>>>>>>>>> E-GEOD-106540 2145 24480 45 544 25193.507 3088
>>>>>>>>>>>>>
>>>>>>>>>>>>> Where "by slice" is total time, summed from running the
>>>>>>>>>>>>> function on each slice and "by table" is the time of just running 
>>>>>>>>>>>>> the
>>>>>>>>>>>>> function on the table concatenated from each slice.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The difference was large (but not *so* large) for ~70
>>>>>>>>>>>>> iterations (1.5x); but for ~550 iterations (and 6x fewer rows, 5x 
>>>>>>>>>>>>> more
>>>>>>>>>>>>> columns) the difference became significant (~10x).
>>>>>>>>>>>>>
>>>>>>>>>>>>> I will follow up here when I have a more reproducible example.
>>>>>>>>>>>>> I also started doing this before tensors were available, so I'll 
>>>>>>>>>>>>> try to see
>>>>>>>>>>>>> how that changes performance.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]:
>>>>>>>>>>>>> https://gist.github.com/drin/4b2e2ea97a07c9ad54647bcdc462611a
>>>>>>>>>>>>>
>>>>>>>>>>>>> Aldrin Montana
>>>>>>>>>>>>> Computer Science PhD Student
>>>>>>>>>>>>> UC Santa Cruz
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Mar 10, 2022 at 2:32 PM Weston Pace <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> As far as I know (and my knowledge here may be dated) the
>>>>>>>>>>>>>> compute
>>>>>>>>>>>>>> kernels themselves do not do any concurrency.  There are
>>>>>>>>>>>>>> certainly
>>>>>>>>>>>>>> compute kernels that could benefit from concurrency in this
>>>>>>>>>>>>>> manner
>>>>>>>>>>>>>> (many kernels naively so) and I think things are setup so
>>>>>>>>>>>>>> that, if we
>>>>>>>>>>>>>> decide to tackle this feature, we could do so in a systematic
>>>>>>>>>>>>>> way
>>>>>>>>>>>>>> (instead of writing something for each kernel).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I believe that kernels, if given a unique kernel context,
>>>>>>>>>>>>>> should be thread safe.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The streaming compute engine, on the other hand, does support
>>>>>>>>>>>>>> concurrency.  It is mostly driven by the scanner at the
>>>>>>>>>>>>>> moment (e.g.
>>>>>>>>>>>>>> each batch we fetch from the scanner gets a fresh thread task
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>> running through the execution plan) but there is some
>>>>>>>>>>>>>> intra-node
>>>>>>>>>>>>>> concurrency in the hash join and (I think) the hash aggregate
>>>>>>>>>>>>>> nodes.
>>>>>>>>>>>>>> This has been sufficient to saturate cores on the benchmarks
>>>>>>>>>>>>>> we run.
>>>>>>>>>>>>>> I know there is ongoing interest in understanding and
>>>>>>>>>>>>>> improving our
>>>>>>>>>>>>>> concurrency here.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The scanner supports concurrency.  It will typically fetch
>>>>>>>>>>>>>> multiple
>>>>>>>>>>>>>> files at once and, for each file, it will fetch multiple
>>>>>>>>>>>>>> batches at
>>>>>>>>>>>>>> once (assuming the file has more than one batch).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> > I see a large difference between the total time to apply
>>>>>>>>>>>>>> compute functions to a single table (concatenated from many 
>>>>>>>>>>>>>> small tables)
>>>>>>>>>>>>>> compared to applying compute functions to each sub-table in the 
>>>>>>>>>>>>>> composition.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Which one is better?  Can you share a reproducible example?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Mar 10, 2022 at 12:01 PM Aldrin <[email protected]>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Hello!
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > I'm wondering if there's any documentation that describes
>>>>>>>>>>>>>> the concurrency/parallelism architecture for the compute API. 
>>>>>>>>>>>>>> I'd also be
>>>>>>>>>>>>>> interested if there are recommended approaches for seeing 
>>>>>>>>>>>>>> performance of
>>>>>>>>>>>>>> threads used by Arrow--should I try to check a processor ID and 
>>>>>>>>>>>>>> infer
>>>>>>>>>>>>>> performance or are there particular tools that the community 
>>>>>>>>>>>>>> uses?
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Specifically, I am wondering if the concurrency is going to
>>>>>>>>>>>>>> be different when using a ChunkedArray as an input compared to 
>>>>>>>>>>>>>> an Array or
>>>>>>>>>>>>>> for ChunkedArrays with various chunk sizes (1 chunk vs tens or 
>>>>>>>>>>>>>> hundreds). I
>>>>>>>>>>>>>> see a large difference between the total time to apply compute 
>>>>>>>>>>>>>> functions to
>>>>>>>>>>>>>> a single table (concatenated from many small tables) compared to 
>>>>>>>>>>>>>> applying
>>>>>>>>>>>>>> compute functions to each sub-table in the composition. I'm 
>>>>>>>>>>>>>> trying to
>>>>>>>>>>>>>> figure out where that difference may come from and I'm wondering 
>>>>>>>>>>>>>> if it's
>>>>>>>>>>>>>> related to parallelism within Arrow.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > I tried using the github issues and JIRA issues (e.g.  [1])
>>>>>>>>>>>>>> as a way to sleuth the info, but I couldn't find anything. The 
>>>>>>>>>>>>>> pyarrow API
>>>>>>>>>>>>>> seems to have functions I could try and use to figure it out 
>>>>>>>>>>>>>> (cpu_count and
>>>>>>>>>>>>>> set_cpu_count), but that seems like a vague road.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > [1]: https://issues.apache.org/jira/browse/ARROW-12726
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Thank you!
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Aldrin Montana
>>>>>>>>>>>>>> > Computer Science PhD Student
>>>>>>>>>>>>>> > UC Santa Cruz
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Niranda Perera
>>>>>>>>> https://niranda.dev/
>>>>>>>>> @n1r44 <https://twitter.com/N1R44>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Niranda Perera
>>>>>>>> https://niranda.dev/
>>>>>>>> @n1r44 <https://twitter.com/N1R44>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Niranda Perera
>>>>>> https://niranda.dev/
>>>>>> @n1r44 <https://twitter.com/N1R44>
>>>>>>
>>>>>>
>>>
>>> --
>>> Niranda Perera
>>> https://niranda.dev/
>>> @n1r44 <https://twitter.com/N1R44>
>>>
>>>

Reply via email to