GitHub user gitmodimo edited a discussion: Future plans for Acero

I would like to start a broad conversation on future of Acero development. 
I have been using Acero for quite some time now and I think I have general 
understanding of current state. I do not use substrait nor python. I am 
strictly using Acero for streaming execution and I found Acero to be well 
designed and thought through. At first used concepts were overwhelming but 
later on I found them all useful and powerful. However along the way I spotted 
some hiccups and with my colleagues we [fixed few of 
them](https://github.com/search?q=repo%3Aapache%2Farrow+author%3Agitmodimo+author%3Amroz45++author%3AX-Lemon-X&type=pullrequests).

With all my experience with Acero I came to conclusion that Acero needs some 
core changes in order to remain versatile and extensible library for streaming 
execution. This is general idea of this discussion. If/How such core changes 
can be introduced. I would like to split the discussion into few distinct 
topics. 
In my understanding Acero was initially designed to execute queries on arrow 
datasets.  Usually queries are not executed in ordered fashion - ordering is 
optionally added as final step (hence OrderBySinkNode). As such Acero did not 
tackle source ordering. Later on additional concept of batch.index was 
introduced that paved the way for maintaining and leveraging source ordering 
(Topic 1). 
All queries that I know of have only one output and as expected all ExecNodes 
have only one output. I am aware that Acero foundation does not prohibit 
multiple outputs in general, but as of now there are no multi-output nodes. 
(Topic 2).
With current state of Acero few coding patters occur. I think they should be 
considered for factoring out do remove code duplication and to simplify 
amintenance (Topic 3).

1. Ordering/Backpressure

Since introduction of batch.index not all exec nodes comply to this new 
semantics - even though multiple exec nodes _do_ realy on data order. Most 
notable:
-asof_join - ordering done GH-41706. Backpressure pending merge GH-46421
-sorted_merge - ~PR ready GH-47269~ not ready
-aggregate - ~PR ready GH-47269 - ordering needed conditionally~ not ready
In addition to those nodes all source and sink nodes need to account now 
ordering concept and user intention to maintain ordering of the source.

Solution to all those nodes that require ordering was to introduce 
SerialSequencingQueue. Although it fixes ordering it unfortunately breaks 
backpressure (SerialSequencingQueue does not limit how many items are queued). 
To fix backpressure SerialSequencingQueue has to produce its own pause signal 
and also propagate pause from downstream. I think the logic of this becomes to 
convoluted to replicate it in every ExecNode. So since ordering is now a global 
concept I think we should move validation_of_ordering + sequencing + 
backpressure logic out of specific ExecNodes and into ExecNode base. This would 
let implementer of new exec node focus on actual data processing and use 
already implemented access patterns of inputs and outputs, that have already 
emerged.

As extra feature Ordering could offer additional “stream” guarantees. Stream 
guarantee would hold condition that is guaranteed to be true for the rest of 
data stream (like “timestamp>x). This could be used to push timeline/segment in 
order leveraging execnodes.

2. Multiple outputs

Multiple outputs ExecNodes are mentioned in sevaral placed across documentation 
and issues, but no implementation of such node was ever implemented. In my 
application I found it neccesary to produce multiple outputs from single 
processing pipeline. In the beginning dataset `tee` node was enough, but along 
the way it turned out to be not flexible enough. A little bit inspired by 
implementation of tee node I created new 
[pipe](https://github.com/apache/arrow/pull/45435) concept that fits Acero 
quite well and provides quite elegant alternative to multiple output nodes. In 
summary there are three new nodes:
-`pipe_sink` - node consumes all exec batches and replicates them to all 
`pipe_source` nodes
-`pipe_source`- is a source node that receives batches.
-`pipe_tee` - node replicates batches to `pipe_source` and output
All pipe nodes have names and sinks are connected by name with sources at init 
stage. Additionally `pipe` can be instantiated as an element in exec node to 
provide additional outputs (for example “filter” node can provide additional 
output of filtered _out_ data). I have been using this concept in my processing 
pipeline extensively and now I completely stopped using `tee` node.
The reason why I think pipes are strong alternative to multi-output nodes is 
that pipes fit elegantly with entire `Declaration` infrastructure. With single 
output declaration always is a tree. Multiple outputs would create a directed 
graph that requires changing literary entire ExecPlan building infrastructure. 
With pipes we get effectively the same functionality that [fits current 
ExecPlan building 
process](https://github.com/apache/arrow/blob/e5e03a22c0c0c96e0e153c4bd3d056b2bfb612e8/cpp/src/arrow/acero/pipe_node_test.cc#L205-L251).

3. Refactoring

 I find several changes that would benefit Acero as a whole:
- Factor out handling of input. All not source exec nodes implement the same 
input_counter_ finish logic. I propose we move this to base ExecNode.
- Create ExecNodeInputAdapters that would sequence input when needed with two 
different access patterns push/pull based.
- Unify and move backpressure handling into input adapters
- Factor out outputing of ExecBatch into ExecNodeOutputAdapter.
- Factor out StopProducing and handle stream InputFinished logic 
ExecNodeOutputAdapter

I am not a maintainer of Arrow nor I am affiliated with Apache in any way, but 
I hope this discussion produces some kind of general roadmap for future 
development of Acero. All this might seem like extreme makeover but I honestly 
believe this is shortest path for fixing the current issues with backpressure 
and as a bonus really cleanup and simplify Acero codebase. I am ready to invest 
some time into it, but first I want to know whether this complies with 
maintainers plans and vision.

GitHub link: https://github.com/apache/arrow/discussions/47331

----
This is an automatically sent email for user@arrow.apache.org.
To unsubscribe, please send an email to: user-unsubscr...@arrow.apache.org

Reply via email to