GitHub user gitmodimo created a discussion: Future plans for Acero
I would like to start a broad conversation on future of Acero development. I have been using Acero for quite some time now and I think I have general understanding of current state. I do not use substrait nor python. I am strictly using Acero for streaming execution and I found Acero to be well designed and thought through. At first used concepts were overwhelming but later on I found them all useful and powerful. However along the way I spotted some hiccups and with my colleagues we [fixed few of them](https://github.com/search?q=repo%3Aapache%2Farrow+author%3Agitmodimo+author%3Amroz45++author%3AX-Lemon-X&type=pullrequests). With all my experience with Acero I came to conclusion that Acero needs some core changes in order to remain versatile and extensible library for streaming execution. This is general idea of this discussion. If/How such core changes can be introduced. I would like to split the discussion into few distinct topics. In my understanding Acero was initially designed to execute queries on arrow datasets. Usually queries are not executed in ordered fashion - ordering is optionally added as final step (hence OrderBySinkNode). As such Acero did not tackle source ordering. Later on additional concept of batch.index was introduced that paved the way for maintaining and leveraging source ordering (Topic 1). All queries that I know of have only one output and as expected all ExecNodes have only one output. I am aware that Acero foundation does not prohibit multiple outputs in general, but as of now there are no multi-output nodes. (Topic 2). With current state of Acero few coding patters occur. I think they should be considered for factoring out do remove code duplication and to simplify amintenance (Topic 3). 1. Ordering/Backpressure Since introduction of batch.index not all exec nodes comply to this new semantics - even though multiple exec nodes _do_ realy on data order. Most notable: -asof_join - done GH-41706 -sorted_merge - PR ready GH-47269 -aggregate - PR ready GH-47269 - ordering needed conditionally In addition to those nodes all source and sink nodes need to account now ordering concept and user intention to maintain ordering of the source. Solution to all those nodes that require ordering was to introduce SerialSequencingQueue. Although it fixes ordering it unfortunately breaks backpressure (SerialSequencingQueue does not limit how many items are queued). To fix backpressure SerialSequencingQueue has to produce its own pause signal and also propagate pause from downstream. I think the logic of this becomes to convoluted to replicate it in every ExecNode. So since ordering is now a global concept I think we should move validation_of_ordering + sequencing + backpressure logic out of specific ExecNodes and into ExecNode base. This would let implementer of new exec node focus on actual data processing and use already implemented access patterns of inputs and outputs, that have already emerged. As extra feature Ordering could offer additional “stream” guarantees. Stream guarantee would hold condition that is guaranteed to be true for the rest of data stream (like “timestamp>x). This could be used to push timeline/segment in order leveraging execnodes. 2. Multiple outputs Multiple outputs ExecNodes are mentioned in sevaral placed across documentation and issues, but no implementation of such node was ever implemented. In my application I found it neccesary to produce multiple outputs from single processing pipeline. In the beginning dataset `tee` node was enough, but along the way it turned out to be not flexible enough. A little bit inspired by implementation of tee node I created new [pipe](https://github.com/apache/arrow/pull/45435) concept that fits Acero quite well and provides quite elegant alternative to multiple output nodes. In summary there are three new nodes: -`pipe_sink` - node consumes all exec batches and replicates them to all `pipe_source` nodes -`pipe_source`- is a source node that receives batches. -`pipe_tee` - node replicates batches to `pipe_source` and output All pipe nodes have names and sinks are connected by name with sources at init stage. Additionally `pipe` can be instantiated as an element in exec node to provide additional outputs (for example “filter” node can provide additional output of filtered _out_ data). I have been using this concept in my processing pipeline extensively and now I completely stopped using `tee` node. The reason why I think pipes are strong alternative to multi-output nodes is that pipes fit elegantly with entire `Declaration` infrastructure. With single output declaration always is a tree. Multiple outputs would create a directed graph that requires changing literary entire ExecPlan building infrastructure. With pipes we get effectively the same functionality that [fits current ExecPlan building process](https://github.com/apache/arrow/blob/e5e03a22c0c0c96e0e153c4bd3d056b2bfb612e8/cpp/src/arrow/acero/pipe_node_test.cc#L205-L251). 3. Refactoring I find several changes that would benefit Acero as a whole: - Factor out handling of input. All not source exec nodes implement the same input_counter_ finish logic. I propose we move this to base ExecNode. - Create ExecNodeInputAdapters that would sequence input when needed with two different access patterns push/pull based. - Unify and move backpressure handling into input adapters - Factor out outputing of ExecBatch into ExecNodeOutputAdapter. - Factor out StopProducing and handle stream InputFinished logic ExecNodeOutputAdapter I am not a maintainer of Arrow nor I am affiliated with Apache in any way, but I hope this discussion produces some kind of general roadmap for future development of Acero. All this might seem like extreme makeover but I honestly believe this is shortest path for fixing the current issues with backpressure and as a bonus really cleanup and simplify Acero codebase. I am ready to invest some time into it, but first I want to know whether this complies with maintainers plans and vision. GitHub link: https://github.com/apache/arrow/discussions/47331 ---- This is an automatically sent email for user@arrow.apache.org. To unsubscribe, please send an email to: user-unsubscr...@arrow.apache.org