Sorry it took awhile, but here is a repository I put together that should
reproducibly illustrate what I am seeing, and what I'd like to understand
better (if not improve)[1].

The linked source code [2] shows 2 places where I am collecting times
(using std::chrono::steady_clock in C++). The unexpected behavior is
mentioned at the end of the README.md [3]:

Applying a function on a Table: "Executed in 4.08 secs"
Applying the function on each "table chunk": "Executed in 58.67 secs"

For the test data I included in this repository, the Table has the
following properties:
* 27118 rows
* 2152 columns
* 565 "table chunks"
* 48 rows per "table chunk"

Every column of the table should be identically chunked, so a "table chunk"
just refers to the same chunk of every column in the table. I call it a
"table chunk" in this email because I'm not sure if it is the same as a
RecordBatch or if RecordBatch lays the columns out differently. The reason
each column is chunked identically is so that the whole table can be split
across many key-values (written to a KeyValue store). In the code I provide
in the repository, the table is decomposed via: Table -> TableReader ->
RecordBatch -> Table::FromRecordBatches (vector of 1 RecordBatch) [4]. I
have found that applying functions to Tables and to ChunkedArrays is more
ergonomic than RecordBatches and Arrays (at least as of ~5.0.0).

So, what I want to understand is what contributes to the difference in
performance?

Thanks for any help or advice you can give!

P.S. I will try to also add tensors to this for my own curiosity.


[1]: https://github.com/drin/arrow-partial-aggr
[2]:
https://github.com/drin/arrow-partial-aggr/blob/mainline/src/simple.cpp#L108
[3]: https://github.com/drin/arrow-partial-aggr/blob/mainline/README.md
[4]:
https://github.com/drin/arrow-partial-aggr/blob/mainline/src/simple.cpp#L123

Aldrin Montana
Computer Science PhD Student
UC Santa Cruz


On Fri, Mar 11, 2022 at 6:44 AM Niranda Perera <[email protected]>
wrote:

> Sore, I think I've missed the smart pointers in my response. It *should*
> be smart pointers, otherwise you'll lose the allocation when you go out of
> context. It should have been,
> class MeanAggr{
> int64_t count_;
> vector<shared_ptr<Array>> sums_;
> vector<shared_ptr<Array>> sum_squares_;
> }
>
> On Fri, Mar 11, 2022 at 3:16 AM Aldrin <[email protected]> wrote:
>
>> Actually, I think I understand now; I misread "extending the class
>> members". But I think the point got across--if I know my table has a single
>> chunk, then I can do the operations on the arrays and then I can wrap the
>> result in a ChunkedArray or Table. For each slice, I can just maintain the
>> results in a vector without smart pointers.
>>
>> I'll definitely try this. Thanks!
>>
>> Aldrin Montana
>> Computer Science PhD Student
>> UC Santa Cruz
>>
>>
>> On Thu, Mar 10, 2022 at 11:35 PM Aldrin <[email protected]> wrote:
>>
>>> I think there's one minor misunderstanding, but I like the essence of
>>> the feedback.
>>>
>>> To clarify, the MeanAggr::Accumulate function is used to gather over
>>> points of a sample, where a row is considered a sample, and columns are
>>> corresponding values, e.g.:
>>>
>>> columns (values) |  c0  |  c1  |  c2 |  c3 |   c4
>>> row 0 (sample 0) |   1  |   2  |   3 |   4 |     5
>>> row 1 (sample 1) |   1  |   4  |  27 | 256 |  3125
>>>
>>> For this tiny example, applying Accumulate "by slice" means that I apply
>>> it once on row 0, then again on row 1, and I add the times together. "By
>>> Table" means that I concatenate row 0 and row 1, then apply Accumulate on
>>> the resulting table. Combine isn't currently being considered (it's for
>>> when I split on columns). You can sort of see this in [1], but it also
>>> illustrates sequential calls of Accumulate instead of using Combine. I will
>>> explain this more in a reproducible example.
>>>
>>> Given the clarification, I am not sure if the suggested local
>>> calculations are helpful, but maybe you mean I shouldn't use so many shared
>>> pointers? Although, I do think I'll try reducing the code path by using
>>> Arrays when I'm applying to a Table that I know has only 1 chunk (because I
>>> have specified it that way). This seems like it should help isolate some of
>>> the overhead.
>>>
>>> Thanks for the feedback!
>>>
>>> [1]:
>>> https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/fb688531169421a5b5985d2cbfee100e793cae2f/resources/assets/TStatistic_Diagram.png
>>>
>>> Aldrin Montana
>>> Computer Science PhD Student
>>> UC Santa Cruz
>>>
>>>
>>> On Thu, Mar 10, 2022 at 7:49 PM Niranda Perera <[email protected]>
>>> wrote:
>>>
>>>> Okay, one thing I immediately see is that there are a lot of memory
>>>> allocations/ deallocations happening in the approach you have given IMO.
>>>> arrow::compute methods are immutable, so when you get an answer, it would
>>>> be allocated freshly in memory, and when you update an existing shared_ptr,
>>>> you would be deallocating the previous buffers. In both, MeanAggr::Combine
>>>> and MeanAggr::Accumulate this is happening and this could be a reason why
>>>> the splitted version is slower. Single table version only has to go through
>>>> MeanAggr::Accumulate.
>>>>
>>>> If I may suggest an alternative approach, I'd do this for variance
>>>> calculation,
>>>> class MeanAggr{
>>>> int64_t count_;
>>>> vector<Array> sums_;
>>>> vector<Array> sum_squares_;
>>>> }
>>>>
>>>> At every Accumulate, I will calculate local sums, sum squares, and
>>>> extend the class members with the resultant ChunkArray's chunks (which are
>>>> Arrays).
>>>> At the end, I'll create some ChunkArrays from these vectors, and use
>>>> E(x^2)-E(x)^2 to calculate the variance. I feel like this might reduce the
>>>> number of extra allocs and deallocs.
>>>>
>>>> On Thu, Mar 10, 2022 at 9:47 PM Aldrin <[email protected]> wrote:
>>>>
>>>>> You're correct with the first clarification. I am not (currently)
>>>>> slicing column-wise.
>>>>>
>>>>> And yes, I am calculating variance, mean, etc. so that I can calculate
>>>>> the t-statistic.
>>>>>
>>>>> Aldrin Montana
>>>>> Computer Science PhD Student
>>>>> UC Santa Cruz
>>>>>
>>>>>
>>>>> On Thu, Mar 10, 2022 at 5:16 PM Niranda Perera <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Or are you slicing column-wise?
>>>>>>
>>>>>> On Thu, Mar 10, 2022 at 8:14 PM Niranda Perera <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> From the looks of it, you are trying to calculate variance, mean,
>>>>>>> etc over rows, isn't it?
>>>>>>>
>>>>>>> I need to clarify a bit on this statement.
>>>>>>> "Where "by slice" is total time, summed from running the function on
>>>>>>> each slice and "by table" is the time of just running the function on 
>>>>>>> the
>>>>>>> table concatenated from each slice."
>>>>>>> So, I assume you are originally using a `vector<shared_ptr<Table>>
>>>>>>> slices`. For the former case, you are passing each slice to
>>>>>>> `MeanAggr::Accumulate`, and for the latter case, you are calling
>>>>>>> arrow::Concatenate(slices) and passing the result as a single table?
>>>>>>>
>>>>>>> On Thu, Mar 10, 2022 at 7:41 PM Aldrin <[email protected]> wrote:
>>>>>>>
>>>>>>>> Oh, but the short answer is that I'm using: Add, Subtract, Divide,
>>>>>>>> Multiply, Power, and Absolute. Sometimes with both inputs being
>>>>>>>> ChunkedArrays, sometimes with 1 input being a ChunkedArray and the 
>>>>>>>> other
>>>>>>>> being a scalar.
>>>>>>>>
>>>>>>>> Aldrin Montana
>>>>>>>> Computer Science PhD Student
>>>>>>>> UC Santa Cruz
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Mar 10, 2022 at 4:38 PM Aldrin <[email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hi Niranda!
>>>>>>>>>
>>>>>>>>> Sure thing, I've linked to my code. [1] is essentially the
>>>>>>>>> function being called, and [2] is an example of a wrapper function 
>>>>>>>>> (more in
>>>>>>>>> that file) I wrote to reduce boilerplate (to make [1] more readable). 
>>>>>>>>> But,
>>>>>>>>> now that I look at [2] again, which I wrote before I really knew much 
>>>>>>>>> about
>>>>>>>>> smart pointers, I wonder if some of what I benchmarked is overhead 
>>>>>>>>> from
>>>>>>>>> misusing C++ structures?
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> [1]:
>>>>>>>>> https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/58839eb921c53d17ac32129be6af214ae4b58a13/src/cpp/processing/statops.cpp#L96
>>>>>>>>> [2]:
>>>>>>>>> https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/58839eb921c53d17ac32129be6af214ae4b58a13/src/cpp/processing/numops.cpp#L18
>>>>>>>>>
>>>>>>>>> Aldrin Montana
>>>>>>>>> Computer Science PhD Student
>>>>>>>>> UC Santa Cruz
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Mar 10, 2022 at 4:30 PM Niranda Perera <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Aldrin,
>>>>>>>>>>
>>>>>>>>>> It would be helpful to know what sort of compute operators you
>>>>>>>>>> are using.
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 10, 2022, 19:12 Aldrin <[email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> I will work on a reproducible example.
>>>>>>>>>>>
>>>>>>>>>>> As a sneak peek, what I was seeing was the following (pasted in
>>>>>>>>>>> gmail, see [1] for markdown version):
>>>>>>>>>>>
>>>>>>>>>>> Table ID Columns Rows Rows (slice) Slice count Time (ms)
>>>>>>>>>>> total; by slice Time (ms)
>>>>>>>>>>> total; by table
>>>>>>>>>>> E-GEOD-100618 415 20631 299 69 644.065 410
>>>>>>>>>>> E-GEOD-76312 2152 27120 48 565 25607.927 2953
>>>>>>>>>>> E-GEOD-106540 2145 24480 45 544 25193.507 3088
>>>>>>>>>>>
>>>>>>>>>>> Where "by slice" is total time, summed from running the function
>>>>>>>>>>> on each slice and "by table" is the time of just running the 
>>>>>>>>>>> function on
>>>>>>>>>>> the table concatenated from each slice.
>>>>>>>>>>>
>>>>>>>>>>> The difference was large (but not *so* large) for ~70 iterations
>>>>>>>>>>> (1.5x); but for ~550 iterations (and 6x fewer rows, 5x more 
>>>>>>>>>>> columns) the
>>>>>>>>>>> difference became significant (~10x).
>>>>>>>>>>>
>>>>>>>>>>> I will follow up here when I have a more reproducible example. I
>>>>>>>>>>> also started doing this before tensors were available, so I'll try 
>>>>>>>>>>> to see
>>>>>>>>>>> how that changes performance.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> [1]:
>>>>>>>>>>> https://gist.github.com/drin/4b2e2ea97a07c9ad54647bcdc462611a
>>>>>>>>>>>
>>>>>>>>>>> Aldrin Montana
>>>>>>>>>>> Computer Science PhD Student
>>>>>>>>>>> UC Santa Cruz
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Mar 10, 2022 at 2:32 PM Weston Pace <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> As far as I know (and my knowledge here may be dated) the
>>>>>>>>>>>> compute
>>>>>>>>>>>> kernels themselves do not do any concurrency.  There are
>>>>>>>>>>>> certainly
>>>>>>>>>>>> compute kernels that could benefit from concurrency in this
>>>>>>>>>>>> manner
>>>>>>>>>>>> (many kernels naively so) and I think things are setup so that,
>>>>>>>>>>>> if we
>>>>>>>>>>>> decide to tackle this feature, we could do so in a systematic
>>>>>>>>>>>> way
>>>>>>>>>>>> (instead of writing something for each kernel).
>>>>>>>>>>>>
>>>>>>>>>>>> I believe that kernels, if given a unique kernel context,
>>>>>>>>>>>> should be thread safe.
>>>>>>>>>>>>
>>>>>>>>>>>> The streaming compute engine, on the other hand, does support
>>>>>>>>>>>> concurrency.  It is mostly driven by the scanner at the moment
>>>>>>>>>>>> (e.g.
>>>>>>>>>>>> each batch we fetch from the scanner gets a fresh thread task
>>>>>>>>>>>> for
>>>>>>>>>>>> running through the execution plan) but there is some intra-node
>>>>>>>>>>>> concurrency in the hash join and (I think) the hash aggregate
>>>>>>>>>>>> nodes.
>>>>>>>>>>>> This has been sufficient to saturate cores on the benchmarks we
>>>>>>>>>>>> run.
>>>>>>>>>>>> I know there is ongoing interest in understanding and improving
>>>>>>>>>>>> our
>>>>>>>>>>>> concurrency here.
>>>>>>>>>>>>
>>>>>>>>>>>> The scanner supports concurrency.  It will typically fetch
>>>>>>>>>>>> multiple
>>>>>>>>>>>> files at once and, for each file, it will fetch multiple
>>>>>>>>>>>> batches at
>>>>>>>>>>>> once (assuming the file has more than one batch).
>>>>>>>>>>>>
>>>>>>>>>>>> > I see a large difference between the total time to apply
>>>>>>>>>>>> compute functions to a single table (concatenated from many small 
>>>>>>>>>>>> tables)
>>>>>>>>>>>> compared to applying compute functions to each sub-table in the 
>>>>>>>>>>>> composition.
>>>>>>>>>>>>
>>>>>>>>>>>> Which one is better?  Can you share a reproducible example?
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Mar 10, 2022 at 12:01 PM Aldrin <[email protected]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> >
>>>>>>>>>>>> > Hello!
>>>>>>>>>>>> >
>>>>>>>>>>>> > I'm wondering if there's any documentation that describes the
>>>>>>>>>>>> concurrency/parallelism architecture for the compute API. I'd also 
>>>>>>>>>>>> be
>>>>>>>>>>>> interested if there are recommended approaches for seeing 
>>>>>>>>>>>> performance of
>>>>>>>>>>>> threads used by Arrow--should I try to check a processor ID and 
>>>>>>>>>>>> infer
>>>>>>>>>>>> performance or are there particular tools that the community uses?
>>>>>>>>>>>> >
>>>>>>>>>>>> > Specifically, I am wondering if the concurrency is going to
>>>>>>>>>>>> be different when using a ChunkedArray as an input compared to an 
>>>>>>>>>>>> Array or
>>>>>>>>>>>> for ChunkedArrays with various chunk sizes (1 chunk vs tens or 
>>>>>>>>>>>> hundreds). I
>>>>>>>>>>>> see a large difference between the total time to apply compute 
>>>>>>>>>>>> functions to
>>>>>>>>>>>> a single table (concatenated from many small tables) compared to 
>>>>>>>>>>>> applying
>>>>>>>>>>>> compute functions to each sub-table in the composition. I'm trying 
>>>>>>>>>>>> to
>>>>>>>>>>>> figure out where that difference may come from and I'm wondering 
>>>>>>>>>>>> if it's
>>>>>>>>>>>> related to parallelism within Arrow.
>>>>>>>>>>>> >
>>>>>>>>>>>> > I tried using the github issues and JIRA issues (e.g.  [1])
>>>>>>>>>>>> as a way to sleuth the info, but I couldn't find anything. The 
>>>>>>>>>>>> pyarrow API
>>>>>>>>>>>> seems to have functions I could try and use to figure it out 
>>>>>>>>>>>> (cpu_count and
>>>>>>>>>>>> set_cpu_count), but that seems like a vague road.
>>>>>>>>>>>> >
>>>>>>>>>>>> > [1]: https://issues.apache.org/jira/browse/ARROW-12726
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> > Thank you!
>>>>>>>>>>>> >
>>>>>>>>>>>> > Aldrin Montana
>>>>>>>>>>>> > Computer Science PhD Student
>>>>>>>>>>>> > UC Santa Cruz
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Niranda Perera
>>>>>>> https://niranda.dev/
>>>>>>> @n1r44 <https://twitter.com/N1R44>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Niranda Perera
>>>>>> https://niranda.dev/
>>>>>> @n1r44 <https://twitter.com/N1R44>
>>>>>>
>>>>>>
>>>>
>>>> --
>>>> Niranda Perera
>>>> https://niranda.dev/
>>>> @n1r44 <https://twitter.com/N1R44>
>>>>
>>>>
>
> --
> Niranda Perera
> https://niranda.dev/
> @n1r44 <https://twitter.com/N1R44>
>
>

Reply via email to