Okay, one thing I immediately see is that there are a lot of memory
allocations/ deallocations happening in the approach you have given IMO.
arrow::compute methods are immutable, so when you get an answer, it would
be allocated freshly in memory, and when you update an existing shared_ptr,
you would be deallocating the previous buffers. In both, MeanAggr::Combine
and MeanAggr::Accumulate this is happening and this could be a reason why
the splitted version is slower. Single table version only has to go through
MeanAggr::Accumulate.

If I may suggest an alternative approach, I'd do this for variance
calculation,
class MeanAggr{
int64_t count_;
vector<Array> sums_;
vector<Array> sum_squares_;
}

At every Accumulate, I will calculate local sums, sum squares, and extend
the class members with the resultant ChunkArray's chunks (which are
Arrays).
At the end, I'll create some ChunkArrays from these vectors, and use
E(x^2)-E(x)^2 to calculate the variance. I feel like this might reduce the
number of extra allocs and deallocs.

On Thu, Mar 10, 2022 at 9:47 PM Aldrin <[email protected]> wrote:

> You're correct with the first clarification. I am not (currently) slicing
> column-wise.
>
> And yes, I am calculating variance, mean, etc. so that I can calculate the
> t-statistic.
>
> Aldrin Montana
> Computer Science PhD Student
> UC Santa Cruz
>
>
> On Thu, Mar 10, 2022 at 5:16 PM Niranda Perera <[email protected]>
> wrote:
>
>> Or are you slicing column-wise?
>>
>> On Thu, Mar 10, 2022 at 8:14 PM Niranda Perera <[email protected]>
>> wrote:
>>
>>> From the looks of it, you are trying to calculate variance, mean, etc
>>> over rows, isn't it?
>>>
>>> I need to clarify a bit on this statement.
>>> "Where "by slice" is total time, summed from running the function on
>>> each slice and "by table" is the time of just running the function on the
>>> table concatenated from each slice."
>>> So, I assume you are originally using a `vector<shared_ptr<Table>>
>>> slices`. For the former case, you are passing each slice to
>>> `MeanAggr::Accumulate`, and for the latter case, you are calling
>>> arrow::Concatenate(slices) and passing the result as a single table?
>>>
>>> On Thu, Mar 10, 2022 at 7:41 PM Aldrin <[email protected]> wrote:
>>>
>>>> Oh, but the short answer is that I'm using: Add, Subtract, Divide,
>>>> Multiply, Power, and Absolute. Sometimes with both inputs being
>>>> ChunkedArrays, sometimes with 1 input being a ChunkedArray and the other
>>>> being a scalar.
>>>>
>>>> Aldrin Montana
>>>> Computer Science PhD Student
>>>> UC Santa Cruz
>>>>
>>>>
>>>> On Thu, Mar 10, 2022 at 4:38 PM Aldrin <[email protected]> wrote:
>>>>
>>>>> Hi Niranda!
>>>>>
>>>>> Sure thing, I've linked to my code. [1] is essentially the function
>>>>> being called, and [2] is an example of a wrapper function (more in that
>>>>> file) I wrote to reduce boilerplate (to make [1] more readable). But, now
>>>>> that I look at [2] again, which I wrote before I really knew much about
>>>>> smart pointers, I wonder if some of what I benchmarked is overhead from
>>>>> misusing C++ structures?
>>>>>
>>>>> Thanks!
>>>>>
>>>>>
>>>>> [1]:
>>>>> https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/58839eb921c53d17ac32129be6af214ae4b58a13/src/cpp/processing/statops.cpp#L96
>>>>> [2]:
>>>>> https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/58839eb921c53d17ac32129be6af214ae4b58a13/src/cpp/processing/numops.cpp#L18
>>>>>
>>>>> Aldrin Montana
>>>>> Computer Science PhD Student
>>>>> UC Santa Cruz
>>>>>
>>>>>
>>>>> On Thu, Mar 10, 2022 at 4:30 PM Niranda Perera <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hi Aldrin,
>>>>>>
>>>>>> It would be helpful to know what sort of compute operators you are
>>>>>> using.
>>>>>>
>>>>>> On Thu, Mar 10, 2022, 19:12 Aldrin <[email protected]> wrote:
>>>>>>
>>>>>>> I will work on a reproducible example.
>>>>>>>
>>>>>>> As a sneak peek, what I was seeing was the following (pasted in
>>>>>>> gmail, see [1] for markdown version):
>>>>>>>
>>>>>>> Table ID Columns Rows Rows (slice) Slice count Time (ms)
>>>>>>> total; by slice Time (ms)
>>>>>>> total; by table
>>>>>>> E-GEOD-100618 415 20631 299 69 644.065 410
>>>>>>> E-GEOD-76312 2152 27120 48 565 25607.927 2953
>>>>>>> E-GEOD-106540 2145 24480 45 544 25193.507 3088
>>>>>>>
>>>>>>> Where "by slice" is total time, summed from running the function on
>>>>>>> each slice and "by table" is the time of just running the function on 
>>>>>>> the
>>>>>>> table concatenated from each slice.
>>>>>>>
>>>>>>> The difference was large (but not *so* large) for ~70 iterations
>>>>>>> (1.5x); but for ~550 iterations (and 6x fewer rows, 5x more columns) the
>>>>>>> difference became significant (~10x).
>>>>>>>
>>>>>>> I will follow up here when I have a more reproducible example. I
>>>>>>> also started doing this before tensors were available, so I'll try to 
>>>>>>> see
>>>>>>> how that changes performance.
>>>>>>>
>>>>>>>
>>>>>>> [1]: https://gist.github.com/drin/4b2e2ea97a07c9ad54647bcdc462611a
>>>>>>>
>>>>>>> Aldrin Montana
>>>>>>> Computer Science PhD Student
>>>>>>> UC Santa Cruz
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Mar 10, 2022 at 2:32 PM Weston Pace <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> As far as I know (and my knowledge here may be dated) the compute
>>>>>>>> kernels themselves do not do any concurrency.  There are certainly
>>>>>>>> compute kernels that could benefit from concurrency in this manner
>>>>>>>> (many kernels naively so) and I think things are setup so that, if
>>>>>>>> we
>>>>>>>> decide to tackle this feature, we could do so in a systematic way
>>>>>>>> (instead of writing something for each kernel).
>>>>>>>>
>>>>>>>> I believe that kernels, if given a unique kernel context, should be
>>>>>>>> thread safe.
>>>>>>>>
>>>>>>>> The streaming compute engine, on the other hand, does support
>>>>>>>> concurrency.  It is mostly driven by the scanner at the moment (e.g.
>>>>>>>> each batch we fetch from the scanner gets a fresh thread task for
>>>>>>>> running through the execution plan) but there is some intra-node
>>>>>>>> concurrency in the hash join and (I think) the hash aggregate nodes.
>>>>>>>> This has been sufficient to saturate cores on the benchmarks we run.
>>>>>>>> I know there is ongoing interest in understanding and improving our
>>>>>>>> concurrency here.
>>>>>>>>
>>>>>>>> The scanner supports concurrency.  It will typically fetch multiple
>>>>>>>> files at once and, for each file, it will fetch multiple batches at
>>>>>>>> once (assuming the file has more than one batch).
>>>>>>>>
>>>>>>>> > I see a large difference between the total time to apply compute
>>>>>>>> functions to a single table (concatenated from many small tables) 
>>>>>>>> compared
>>>>>>>> to applying compute functions to each sub-table in the composition.
>>>>>>>>
>>>>>>>> Which one is better?  Can you share a reproducible example?
>>>>>>>>
>>>>>>>> On Thu, Mar 10, 2022 at 12:01 PM Aldrin <[email protected]> wrote:
>>>>>>>> >
>>>>>>>> > Hello!
>>>>>>>> >
>>>>>>>> > I'm wondering if there's any documentation that describes the
>>>>>>>> concurrency/parallelism architecture for the compute API. I'd also be
>>>>>>>> interested if there are recommended approaches for seeing performance 
>>>>>>>> of
>>>>>>>> threads used by Arrow--should I try to check a processor ID and infer
>>>>>>>> performance or are there particular tools that the community uses?
>>>>>>>> >
>>>>>>>> > Specifically, I am wondering if the concurrency is going to be
>>>>>>>> different when using a ChunkedArray as an input compared to an Array 
>>>>>>>> or for
>>>>>>>> ChunkedArrays with various chunk sizes (1 chunk vs tens or hundreds). 
>>>>>>>> I see
>>>>>>>> a large difference between the total time to apply compute functions 
>>>>>>>> to a
>>>>>>>> single table (concatenated from many small tables) compared to applying
>>>>>>>> compute functions to each sub-table in the composition. I'm trying to
>>>>>>>> figure out where that difference may come from and I'm wondering if 
>>>>>>>> it's
>>>>>>>> related to parallelism within Arrow.
>>>>>>>> >
>>>>>>>> > I tried using the github issues and JIRA issues (e.g.  [1]) as a
>>>>>>>> way to sleuth the info, but I couldn't find anything. The pyarrow API 
>>>>>>>> seems
>>>>>>>> to have functions I could try and use to figure it out (cpu_count and
>>>>>>>> set_cpu_count), but that seems like a vague road.
>>>>>>>> >
>>>>>>>> > [1]: https://issues.apache.org/jira/browse/ARROW-12726
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > Thank you!
>>>>>>>> >
>>>>>>>> > Aldrin Montana
>>>>>>>> > Computer Science PhD Student
>>>>>>>> > UC Santa Cruz
>>>>>>>>
>>>>>>>
>>>
>>> --
>>> Niranda Perera
>>> https://niranda.dev/
>>> @n1r44 <https://twitter.com/N1R44>
>>>
>>>
>>
>> --
>> Niranda Perera
>> https://niranda.dev/
>> @n1r44 <https://twitter.com/N1R44>
>>
>>

-- 
Niranda Perera
https://niranda.dev/
@n1r44 <https://twitter.com/N1R44>

Reply via email to