Actually, I think I understand now; I misread "extending the class members". But I think the point got across--if I know my table has a single chunk, then I can do the operations on the arrays and then I can wrap the result in a ChunkedArray or Table. For each slice, I can just maintain the results in a vector without smart pointers.
I'll definitely try this. Thanks! Aldrin Montana Computer Science PhD Student UC Santa Cruz On Thu, Mar 10, 2022 at 11:35 PM Aldrin <[email protected]> wrote: > I think there's one minor misunderstanding, but I like the essence of the > feedback. > > To clarify, the MeanAggr::Accumulate function is used to gather over > points of a sample, where a row is considered a sample, and columns are > corresponding values, e.g.: > > columns (values) | c0 | c1 | c2 | c3 | c4 > row 0 (sample 0) | 1 | 2 | 3 | 4 | 5 > row 1 (sample 1) | 1 | 4 | 27 | 256 | 3125 > > For this tiny example, applying Accumulate "by slice" means that I apply > it once on row 0, then again on row 1, and I add the times together. "By > Table" means that I concatenate row 0 and row 1, then apply Accumulate on > the resulting table. Combine isn't currently being considered (it's for > when I split on columns). You can sort of see this in [1], but it also > illustrates sequential calls of Accumulate instead of using Combine. I will > explain this more in a reproducible example. > > Given the clarification, I am not sure if the suggested local calculations > are helpful, but maybe you mean I shouldn't use so many shared pointers? > Although, I do think I'll try reducing the code path by using Arrays when > I'm applying to a Table that I know has only 1 chunk (because I have > specified it that way). This seems like it should help isolate some of the > overhead. > > Thanks for the feedback! > > [1]: > https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/fb688531169421a5b5985d2cbfee100e793cae2f/resources/assets/TStatistic_Diagram.png > > Aldrin Montana > Computer Science PhD Student > UC Santa Cruz > > > On Thu, Mar 10, 2022 at 7:49 PM Niranda Perera <[email protected]> > wrote: > >> Okay, one thing I immediately see is that there are a lot of memory >> allocations/ deallocations happening in the approach you have given IMO. >> arrow::compute methods are immutable, so when you get an answer, it would >> be allocated freshly in memory, and when you update an existing shared_ptr, >> you would be deallocating the previous buffers. In both, MeanAggr::Combine >> and MeanAggr::Accumulate this is happening and this could be a reason why >> the splitted version is slower. Single table version only has to go through >> MeanAggr::Accumulate. >> >> If I may suggest an alternative approach, I'd do this for variance >> calculation, >> class MeanAggr{ >> int64_t count_; >> vector<Array> sums_; >> vector<Array> sum_squares_; >> } >> >> At every Accumulate, I will calculate local sums, sum squares, and extend >> the class members with the resultant ChunkArray's chunks (which are >> Arrays). >> At the end, I'll create some ChunkArrays from these vectors, and use >> E(x^2)-E(x)^2 to calculate the variance. I feel like this might reduce the >> number of extra allocs and deallocs. >> >> On Thu, Mar 10, 2022 at 9:47 PM Aldrin <[email protected]> wrote: >> >>> You're correct with the first clarification. I am not (currently) >>> slicing column-wise. >>> >>> And yes, I am calculating variance, mean, etc. so that I can calculate >>> the t-statistic. >>> >>> Aldrin Montana >>> Computer Science PhD Student >>> UC Santa Cruz >>> >>> >>> On Thu, Mar 10, 2022 at 5:16 PM Niranda Perera <[email protected]> >>> wrote: >>> >>>> Or are you slicing column-wise? >>>> >>>> On Thu, Mar 10, 2022 at 8:14 PM Niranda Perera < >>>> [email protected]> wrote: >>>> >>>>> From the looks of it, you are trying to calculate variance, mean, etc >>>>> over rows, isn't it? >>>>> >>>>> I need to clarify a bit on this statement. >>>>> "Where "by slice" is total time, summed from running the function on >>>>> each slice and "by table" is the time of just running the function on the >>>>> table concatenated from each slice." >>>>> So, I assume you are originally using a `vector<shared_ptr<Table>> >>>>> slices`. For the former case, you are passing each slice to >>>>> `MeanAggr::Accumulate`, and for the latter case, you are calling >>>>> arrow::Concatenate(slices) and passing the result as a single table? >>>>> >>>>> On Thu, Mar 10, 2022 at 7:41 PM Aldrin <[email protected]> wrote: >>>>> >>>>>> Oh, but the short answer is that I'm using: Add, Subtract, Divide, >>>>>> Multiply, Power, and Absolute. Sometimes with both inputs being >>>>>> ChunkedArrays, sometimes with 1 input being a ChunkedArray and the other >>>>>> being a scalar. >>>>>> >>>>>> Aldrin Montana >>>>>> Computer Science PhD Student >>>>>> UC Santa Cruz >>>>>> >>>>>> >>>>>> On Thu, Mar 10, 2022 at 4:38 PM Aldrin <[email protected]> wrote: >>>>>> >>>>>>> Hi Niranda! >>>>>>> >>>>>>> Sure thing, I've linked to my code. [1] is essentially the function >>>>>>> being called, and [2] is an example of a wrapper function (more in that >>>>>>> file) I wrote to reduce boilerplate (to make [1] more readable). But, >>>>>>> now >>>>>>> that I look at [2] again, which I wrote before I really knew much about >>>>>>> smart pointers, I wonder if some of what I benchmarked is overhead from >>>>>>> misusing C++ structures? >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> >>>>>>> [1]: >>>>>>> https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/58839eb921c53d17ac32129be6af214ae4b58a13/src/cpp/processing/statops.cpp#L96 >>>>>>> [2]: >>>>>>> https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/58839eb921c53d17ac32129be6af214ae4b58a13/src/cpp/processing/numops.cpp#L18 >>>>>>> >>>>>>> Aldrin Montana >>>>>>> Computer Science PhD Student >>>>>>> UC Santa Cruz >>>>>>> >>>>>>> >>>>>>> On Thu, Mar 10, 2022 at 4:30 PM Niranda Perera < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Hi Aldrin, >>>>>>>> >>>>>>>> It would be helpful to know what sort of compute operators you are >>>>>>>> using. >>>>>>>> >>>>>>>> On Thu, Mar 10, 2022, 19:12 Aldrin <[email protected]> wrote: >>>>>>>> >>>>>>>>> I will work on a reproducible example. >>>>>>>>> >>>>>>>>> As a sneak peek, what I was seeing was the following (pasted in >>>>>>>>> gmail, see [1] for markdown version): >>>>>>>>> >>>>>>>>> Table ID Columns Rows Rows (slice) Slice count Time (ms) >>>>>>>>> total; by slice Time (ms) >>>>>>>>> total; by table >>>>>>>>> E-GEOD-100618 415 20631 299 69 644.065 410 >>>>>>>>> E-GEOD-76312 2152 27120 48 565 25607.927 2953 >>>>>>>>> E-GEOD-106540 2145 24480 45 544 25193.507 3088 >>>>>>>>> >>>>>>>>> Where "by slice" is total time, summed from running the function >>>>>>>>> on each slice and "by table" is the time of just running the function >>>>>>>>> on >>>>>>>>> the table concatenated from each slice. >>>>>>>>> >>>>>>>>> The difference was large (but not *so* large) for ~70 iterations >>>>>>>>> (1.5x); but for ~550 iterations (and 6x fewer rows, 5x more columns) >>>>>>>>> the >>>>>>>>> difference became significant (~10x). >>>>>>>>> >>>>>>>>> I will follow up here when I have a more reproducible example. I >>>>>>>>> also started doing this before tensors were available, so I'll try to >>>>>>>>> see >>>>>>>>> how that changes performance. >>>>>>>>> >>>>>>>>> >>>>>>>>> [1]: https://gist.github.com/drin/4b2e2ea97a07c9ad54647bcdc462611a >>>>>>>>> >>>>>>>>> Aldrin Montana >>>>>>>>> Computer Science PhD Student >>>>>>>>> UC Santa Cruz >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Mar 10, 2022 at 2:32 PM Weston Pace <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> As far as I know (and my knowledge here may be dated) the compute >>>>>>>>>> kernels themselves do not do any concurrency. There are certainly >>>>>>>>>> compute kernels that could benefit from concurrency in this manner >>>>>>>>>> (many kernels naively so) and I think things are setup so that, >>>>>>>>>> if we >>>>>>>>>> decide to tackle this feature, we could do so in a systematic way >>>>>>>>>> (instead of writing something for each kernel). >>>>>>>>>> >>>>>>>>>> I believe that kernels, if given a unique kernel context, should >>>>>>>>>> be thread safe. >>>>>>>>>> >>>>>>>>>> The streaming compute engine, on the other hand, does support >>>>>>>>>> concurrency. It is mostly driven by the scanner at the moment >>>>>>>>>> (e.g. >>>>>>>>>> each batch we fetch from the scanner gets a fresh thread task for >>>>>>>>>> running through the execution plan) but there is some intra-node >>>>>>>>>> concurrency in the hash join and (I think) the hash aggregate >>>>>>>>>> nodes. >>>>>>>>>> This has been sufficient to saturate cores on the benchmarks we >>>>>>>>>> run. >>>>>>>>>> I know there is ongoing interest in understanding and improving >>>>>>>>>> our >>>>>>>>>> concurrency here. >>>>>>>>>> >>>>>>>>>> The scanner supports concurrency. It will typically fetch >>>>>>>>>> multiple >>>>>>>>>> files at once and, for each file, it will fetch multiple batches >>>>>>>>>> at >>>>>>>>>> once (assuming the file has more than one batch). >>>>>>>>>> >>>>>>>>>> > I see a large difference between the total time to apply >>>>>>>>>> compute functions to a single table (concatenated from many small >>>>>>>>>> tables) >>>>>>>>>> compared to applying compute functions to each sub-table in the >>>>>>>>>> composition. >>>>>>>>>> >>>>>>>>>> Which one is better? Can you share a reproducible example? >>>>>>>>>> >>>>>>>>>> On Thu, Mar 10, 2022 at 12:01 PM Aldrin <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> > >>>>>>>>>> > Hello! >>>>>>>>>> > >>>>>>>>>> > I'm wondering if there's any documentation that describes the >>>>>>>>>> concurrency/parallelism architecture for the compute API. I'd also be >>>>>>>>>> interested if there are recommended approaches for seeing >>>>>>>>>> performance of >>>>>>>>>> threads used by Arrow--should I try to check a processor ID and infer >>>>>>>>>> performance or are there particular tools that the community uses? >>>>>>>>>> > >>>>>>>>>> > Specifically, I am wondering if the concurrency is going to be >>>>>>>>>> different when using a ChunkedArray as an input compared to an Array >>>>>>>>>> or for >>>>>>>>>> ChunkedArrays with various chunk sizes (1 chunk vs tens or >>>>>>>>>> hundreds). I see >>>>>>>>>> a large difference between the total time to apply compute functions >>>>>>>>>> to a >>>>>>>>>> single table (concatenated from many small tables) compared to >>>>>>>>>> applying >>>>>>>>>> compute functions to each sub-table in the composition. I'm trying to >>>>>>>>>> figure out where that difference may come from and I'm wondering if >>>>>>>>>> it's >>>>>>>>>> related to parallelism within Arrow. >>>>>>>>>> > >>>>>>>>>> > I tried using the github issues and JIRA issues (e.g. [1]) as >>>>>>>>>> a way to sleuth the info, but I couldn't find anything. The pyarrow >>>>>>>>>> API >>>>>>>>>> seems to have functions I could try and use to figure it out >>>>>>>>>> (cpu_count and >>>>>>>>>> set_cpu_count), but that seems like a vague road. >>>>>>>>>> > >>>>>>>>>> > [1]: https://issues.apache.org/jira/browse/ARROW-12726 >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > Thank you! >>>>>>>>>> > >>>>>>>>>> > Aldrin Montana >>>>>>>>>> > Computer Science PhD Student >>>>>>>>>> > UC Santa Cruz >>>>>>>>>> >>>>>>>>> >>>>> >>>>> -- >>>>> Niranda Perera >>>>> https://niranda.dev/ >>>>> @n1r44 <https://twitter.com/N1R44> >>>>> >>>>> >>>> >>>> -- >>>> Niranda Perera >>>> https://niranda.dev/ >>>> @n1r44 <https://twitter.com/N1R44> >>>> >>>> >> >> -- >> Niranda Perera >> https://niranda.dev/ >> @n1r44 <https://twitter.com/N1R44> >> >>
