Thank you for providing such an interesting (and enlightening) reproduction. In both cases you are calling the compute functions exactly the same. For example, you are adding two chunked arrays in both cases. The key difference appears to be that you are either making <1 call to `Add` with a chunked array with 565 chunks> or <565 calls to `Add` with a chunked array with 1 chunk>.
One could theorize that the underlying code is somehow allocating or iterating more efficiently when given a large batch of input data. However, running through a profiler, that does not appear to be the case. Instead I think the C++ implementation is simply inefficient at the moment and our function overhead is too high. The cost is spread out very diffusely throughout the entire compute module. I have created [1] to get a better grasp on this. The details of the fixes can be left for JIRA and/or the dev@ mailing list but reducing this overhead is pretty important for my current work so I hope to be able to look into it. I'll also add that this is a pretty similar concern to [2] which Wes raised last summer. -Weston [1] https://issues.apache.org/jira/browse/ARROW-16014 [2] https://lists.apache.org/thread/mp68ofm2hnvs2v2oz276rvw7y5kwqoyd On Sun, Mar 20, 2022 at 9:20 PM Aldrin <[email protected]> wrote: > Sorry it took awhile, but here is a repository I put together that should > reproducibly illustrate what I am seeing, and what I'd like to understand > better (if not improve)[1]. > > The linked source code [2] shows 2 places where I am collecting times > (using std::chrono::steady_clock in C++). The unexpected behavior is > mentioned at the end of the README.md [3]: > > Applying a function on a Table: "Executed in 4.08 secs" > Applying the function on each "table chunk": "Executed in 58.67 secs" > > For the test data I included in this repository, the Table has the > following properties: > * 27118 rows > * 2152 columns > * 565 "table chunks" > * 48 rows per "table chunk" > > Every column of the table should be identically chunked, so a "table > chunk" just refers to the same chunk of every column in the table. I call > it a "table chunk" in this email because I'm not sure if it is the same as > a RecordBatch or if RecordBatch lays the columns out differently. The > reason each column is chunked identically is so that the whole table can be > split across many key-values (written to a KeyValue store). In the code I > provide in the repository, the table is decomposed via: Table -> > TableReader -> RecordBatch -> Table::FromRecordBatches (vector of 1 > RecordBatch) [4]. I have found that applying functions to Tables and to > ChunkedArrays is more ergonomic than RecordBatches and Arrays (at least as > of ~5.0.0). > > So, what I want to understand is what contributes to the difference in > performance? > > Thanks for any help or advice you can give! > > P.S. I will try to also add tensors to this for my own curiosity. > > > [1]: https://github.com/drin/arrow-partial-aggr > [2]: > https://github.com/drin/arrow-partial-aggr/blob/mainline/src/simple.cpp#L108 > [3]: https://github.com/drin/arrow-partial-aggr/blob/mainline/README.md > [4]: > https://github.com/drin/arrow-partial-aggr/blob/mainline/src/simple.cpp#L123 > > Aldrin Montana > Computer Science PhD Student > UC Santa Cruz > > > On Fri, Mar 11, 2022 at 6:44 AM Niranda Perera <[email protected]> > wrote: > >> Sore, I think I've missed the smart pointers in my response. It *should* >> be smart pointers, otherwise you'll lose the allocation when you go out of >> context. It should have been, >> class MeanAggr{ >> int64_t count_; >> vector<shared_ptr<Array>> sums_; >> vector<shared_ptr<Array>> sum_squares_; >> } >> >> On Fri, Mar 11, 2022 at 3:16 AM Aldrin <[email protected]> wrote: >> >>> Actually, I think I understand now; I misread "extending the class >>> members". But I think the point got across--if I know my table has a single >>> chunk, then I can do the operations on the arrays and then I can wrap the >>> result in a ChunkedArray or Table. For each slice, I can just maintain the >>> results in a vector without smart pointers. >>> >>> I'll definitely try this. Thanks! >>> >>> Aldrin Montana >>> Computer Science PhD Student >>> UC Santa Cruz >>> >>> >>> On Thu, Mar 10, 2022 at 11:35 PM Aldrin <[email protected]> wrote: >>> >>>> I think there's one minor misunderstanding, but I like the essence of >>>> the feedback. >>>> >>>> To clarify, the MeanAggr::Accumulate function is used to gather over >>>> points of a sample, where a row is considered a sample, and columns are >>>> corresponding values, e.g.: >>>> >>>> columns (values) | c0 | c1 | c2 | c3 | c4 >>>> row 0 (sample 0) | 1 | 2 | 3 | 4 | 5 >>>> row 1 (sample 1) | 1 | 4 | 27 | 256 | 3125 >>>> >>>> For this tiny example, applying Accumulate "by slice" means that I >>>> apply it once on row 0, then again on row 1, and I add the times together. >>>> "By Table" means that I concatenate row 0 and row 1, then apply Accumulate >>>> on the resulting table. Combine isn't currently being considered (it's for >>>> when I split on columns). You can sort of see this in [1], but it also >>>> illustrates sequential calls of Accumulate instead of using Combine. I will >>>> explain this more in a reproducible example. >>>> >>>> Given the clarification, I am not sure if the suggested local >>>> calculations are helpful, but maybe you mean I shouldn't use so many shared >>>> pointers? Although, I do think I'll try reducing the code path by using >>>> Arrays when I'm applying to a Table that I know has only 1 chunk (because I >>>> have specified it that way). This seems like it should help isolate some of >>>> the overhead. >>>> >>>> Thanks for the feedback! >>>> >>>> [1]: >>>> https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/fb688531169421a5b5985d2cbfee100e793cae2f/resources/assets/TStatistic_Diagram.png >>>> >>>> Aldrin Montana >>>> Computer Science PhD Student >>>> UC Santa Cruz >>>> >>>> >>>> On Thu, Mar 10, 2022 at 7:49 PM Niranda Perera < >>>> [email protected]> wrote: >>>> >>>>> Okay, one thing I immediately see is that there are a lot of memory >>>>> allocations/ deallocations happening in the approach you have given IMO. >>>>> arrow::compute methods are immutable, so when you get an answer, it would >>>>> be allocated freshly in memory, and when you update an existing >>>>> shared_ptr, >>>>> you would be deallocating the previous buffers. In both, MeanAggr::Combine >>>>> and MeanAggr::Accumulate this is happening and this could be a reason why >>>>> the splitted version is slower. Single table version only has to go >>>>> through >>>>> MeanAggr::Accumulate. >>>>> >>>>> If I may suggest an alternative approach, I'd do this for variance >>>>> calculation, >>>>> class MeanAggr{ >>>>> int64_t count_; >>>>> vector<Array> sums_; >>>>> vector<Array> sum_squares_; >>>>> } >>>>> >>>>> At every Accumulate, I will calculate local sums, sum squares, and >>>>> extend the class members with the resultant ChunkArray's chunks (which are >>>>> Arrays). >>>>> At the end, I'll create some ChunkArrays from these vectors, and use >>>>> E(x^2)-E(x)^2 to calculate the variance. I feel like this might reduce the >>>>> number of extra allocs and deallocs. >>>>> >>>>> On Thu, Mar 10, 2022 at 9:47 PM Aldrin <[email protected]> wrote: >>>>> >>>>>> You're correct with the first clarification. I am not (currently) >>>>>> slicing column-wise. >>>>>> >>>>>> And yes, I am calculating variance, mean, etc. so that I can >>>>>> calculate the t-statistic. >>>>>> >>>>>> Aldrin Montana >>>>>> Computer Science PhD Student >>>>>> UC Santa Cruz >>>>>> >>>>>> >>>>>> On Thu, Mar 10, 2022 at 5:16 PM Niranda Perera < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Or are you slicing column-wise? >>>>>>> >>>>>>> On Thu, Mar 10, 2022 at 8:14 PM Niranda Perera < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> From the looks of it, you are trying to calculate variance, mean, >>>>>>>> etc over rows, isn't it? >>>>>>>> >>>>>>>> I need to clarify a bit on this statement. >>>>>>>> "Where "by slice" is total time, summed from running the function >>>>>>>> on each slice and "by table" is the time of just running the function >>>>>>>> on >>>>>>>> the table concatenated from each slice." >>>>>>>> So, I assume you are originally using a `vector<shared_ptr<Table>> >>>>>>>> slices`. For the former case, you are passing each slice to >>>>>>>> `MeanAggr::Accumulate`, and for the latter case, you are calling >>>>>>>> arrow::Concatenate(slices) and passing the result as a single table? >>>>>>>> >>>>>>>> On Thu, Mar 10, 2022 at 7:41 PM Aldrin <[email protected]> wrote: >>>>>>>> >>>>>>>>> Oh, but the short answer is that I'm using: Add, Subtract, Divide, >>>>>>>>> Multiply, Power, and Absolute. Sometimes with both inputs being >>>>>>>>> ChunkedArrays, sometimes with 1 input being a ChunkedArray and the >>>>>>>>> other >>>>>>>>> being a scalar. >>>>>>>>> >>>>>>>>> Aldrin Montana >>>>>>>>> Computer Science PhD Student >>>>>>>>> UC Santa Cruz >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Mar 10, 2022 at 4:38 PM Aldrin <[email protected]> wrote: >>>>>>>>> >>>>>>>>>> Hi Niranda! >>>>>>>>>> >>>>>>>>>> Sure thing, I've linked to my code. [1] is essentially the >>>>>>>>>> function being called, and [2] is an example of a wrapper function >>>>>>>>>> (more in >>>>>>>>>> that file) I wrote to reduce boilerplate (to make [1] more >>>>>>>>>> readable). But, >>>>>>>>>> now that I look at [2] again, which I wrote before I really knew >>>>>>>>>> much about >>>>>>>>>> smart pointers, I wonder if some of what I benchmarked is overhead >>>>>>>>>> from >>>>>>>>>> misusing C++ structures? >>>>>>>>>> >>>>>>>>>> Thanks! >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> [1]: >>>>>>>>>> https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/58839eb921c53d17ac32129be6af214ae4b58a13/src/cpp/processing/statops.cpp#L96 >>>>>>>>>> [2]: >>>>>>>>>> https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/58839eb921c53d17ac32129be6af214ae4b58a13/src/cpp/processing/numops.cpp#L18 >>>>>>>>>> >>>>>>>>>> Aldrin Montana >>>>>>>>>> Computer Science PhD Student >>>>>>>>>> UC Santa Cruz >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Mar 10, 2022 at 4:30 PM Niranda Perera < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Aldrin, >>>>>>>>>>> >>>>>>>>>>> It would be helpful to know what sort of compute operators you >>>>>>>>>>> are using. >>>>>>>>>>> >>>>>>>>>>> On Thu, Mar 10, 2022, 19:12 Aldrin <[email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> I will work on a reproducible example. >>>>>>>>>>>> >>>>>>>>>>>> As a sneak peek, what I was seeing was the following (pasted in >>>>>>>>>>>> gmail, see [1] for markdown version): >>>>>>>>>>>> >>>>>>>>>>>> Table ID Columns Rows Rows (slice) Slice count Time (ms) >>>>>>>>>>>> total; by slice Time (ms) >>>>>>>>>>>> total; by table >>>>>>>>>>>> E-GEOD-100618 415 20631 299 69 644.065 410 >>>>>>>>>>>> E-GEOD-76312 2152 27120 48 565 25607.927 2953 >>>>>>>>>>>> E-GEOD-106540 2145 24480 45 544 25193.507 3088 >>>>>>>>>>>> >>>>>>>>>>>> Where "by slice" is total time, summed from running the >>>>>>>>>>>> function on each slice and "by table" is the time of just running >>>>>>>>>>>> the >>>>>>>>>>>> function on the table concatenated from each slice. >>>>>>>>>>>> >>>>>>>>>>>> The difference was large (but not *so* large) for ~70 >>>>>>>>>>>> iterations (1.5x); but for ~550 iterations (and 6x fewer rows, 5x >>>>>>>>>>>> more >>>>>>>>>>>> columns) the difference became significant (~10x). >>>>>>>>>>>> >>>>>>>>>>>> I will follow up here when I have a more reproducible example. >>>>>>>>>>>> I also started doing this before tensors were available, so I'll >>>>>>>>>>>> try to see >>>>>>>>>>>> how that changes performance. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> [1]: >>>>>>>>>>>> https://gist.github.com/drin/4b2e2ea97a07c9ad54647bcdc462611a >>>>>>>>>>>> >>>>>>>>>>>> Aldrin Montana >>>>>>>>>>>> Computer Science PhD Student >>>>>>>>>>>> UC Santa Cruz >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Mar 10, 2022 at 2:32 PM Weston Pace < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> As far as I know (and my knowledge here may be dated) the >>>>>>>>>>>>> compute >>>>>>>>>>>>> kernels themselves do not do any concurrency. There are >>>>>>>>>>>>> certainly >>>>>>>>>>>>> compute kernels that could benefit from concurrency in this >>>>>>>>>>>>> manner >>>>>>>>>>>>> (many kernels naively so) and I think things are setup so >>>>>>>>>>>>> that, if we >>>>>>>>>>>>> decide to tackle this feature, we could do so in a systematic >>>>>>>>>>>>> way >>>>>>>>>>>>> (instead of writing something for each kernel). >>>>>>>>>>>>> >>>>>>>>>>>>> I believe that kernels, if given a unique kernel context, >>>>>>>>>>>>> should be thread safe. >>>>>>>>>>>>> >>>>>>>>>>>>> The streaming compute engine, on the other hand, does support >>>>>>>>>>>>> concurrency. It is mostly driven by the scanner at the moment >>>>>>>>>>>>> (e.g. >>>>>>>>>>>>> each batch we fetch from the scanner gets a fresh thread task >>>>>>>>>>>>> for >>>>>>>>>>>>> running through the execution plan) but there is some >>>>>>>>>>>>> intra-node >>>>>>>>>>>>> concurrency in the hash join and (I think) the hash aggregate >>>>>>>>>>>>> nodes. >>>>>>>>>>>>> This has been sufficient to saturate cores on the benchmarks >>>>>>>>>>>>> we run. >>>>>>>>>>>>> I know there is ongoing interest in understanding and >>>>>>>>>>>>> improving our >>>>>>>>>>>>> concurrency here. >>>>>>>>>>>>> >>>>>>>>>>>>> The scanner supports concurrency. It will typically fetch >>>>>>>>>>>>> multiple >>>>>>>>>>>>> files at once and, for each file, it will fetch multiple >>>>>>>>>>>>> batches at >>>>>>>>>>>>> once (assuming the file has more than one batch). >>>>>>>>>>>>> >>>>>>>>>>>>> > I see a large difference between the total time to apply >>>>>>>>>>>>> compute functions to a single table (concatenated from many small >>>>>>>>>>>>> tables) >>>>>>>>>>>>> compared to applying compute functions to each sub-table in the >>>>>>>>>>>>> composition. >>>>>>>>>>>>> >>>>>>>>>>>>> Which one is better? Can you share a reproducible example? >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Mar 10, 2022 at 12:01 PM Aldrin <[email protected]> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> > >>>>>>>>>>>>> > Hello! >>>>>>>>>>>>> > >>>>>>>>>>>>> > I'm wondering if there's any documentation that describes >>>>>>>>>>>>> the concurrency/parallelism architecture for the compute API. I'd >>>>>>>>>>>>> also be >>>>>>>>>>>>> interested if there are recommended approaches for seeing >>>>>>>>>>>>> performance of >>>>>>>>>>>>> threads used by Arrow--should I try to check a processor ID and >>>>>>>>>>>>> infer >>>>>>>>>>>>> performance or are there particular tools that the community uses? >>>>>>>>>>>>> > >>>>>>>>>>>>> > Specifically, I am wondering if the concurrency is going to >>>>>>>>>>>>> be different when using a ChunkedArray as an input compared to an >>>>>>>>>>>>> Array or >>>>>>>>>>>>> for ChunkedArrays with various chunk sizes (1 chunk vs tens or >>>>>>>>>>>>> hundreds). I >>>>>>>>>>>>> see a large difference between the total time to apply compute >>>>>>>>>>>>> functions to >>>>>>>>>>>>> a single table (concatenated from many small tables) compared to >>>>>>>>>>>>> applying >>>>>>>>>>>>> compute functions to each sub-table in the composition. I'm >>>>>>>>>>>>> trying to >>>>>>>>>>>>> figure out where that difference may come from and I'm wondering >>>>>>>>>>>>> if it's >>>>>>>>>>>>> related to parallelism within Arrow. >>>>>>>>>>>>> > >>>>>>>>>>>>> > I tried using the github issues and JIRA issues (e.g. [1]) >>>>>>>>>>>>> as a way to sleuth the info, but I couldn't find anything. The >>>>>>>>>>>>> pyarrow API >>>>>>>>>>>>> seems to have functions I could try and use to figure it out >>>>>>>>>>>>> (cpu_count and >>>>>>>>>>>>> set_cpu_count), but that seems like a vague road. >>>>>>>>>>>>> > >>>>>>>>>>>>> > [1]: https://issues.apache.org/jira/browse/ARROW-12726 >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > Thank you! >>>>>>>>>>>>> > >>>>>>>>>>>>> > Aldrin Montana >>>>>>>>>>>>> > Computer Science PhD Student >>>>>>>>>>>>> > UC Santa Cruz >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Niranda Perera >>>>>>>> https://niranda.dev/ >>>>>>>> @n1r44 <https://twitter.com/N1R44> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Niranda Perera >>>>>>> https://niranda.dev/ >>>>>>> @n1r44 <https://twitter.com/N1R44> >>>>>>> >>>>>>> >>>>> >>>>> -- >>>>> Niranda Perera >>>>> https://niranda.dev/ >>>>> @n1r44 <https://twitter.com/N1R44> >>>>> >>>>> >> >> -- >> Niranda Perera >> https://niranda.dev/ >> @n1r44 <https://twitter.com/N1R44> >> >>
