Re: [DISCUSS] batch ownership
I missed this as a discussion since it had the title of a GitHub discussion. Comments below. On Friday, April 27, 2018, 5:42:37 PM PDT, salim achouchewrote: > Another point, I don't see a functional benefit from avoiding a change of ownership for pass-through operators. Please read my responses to Vlad. Change of ownership is critical to how Drill's memory allocators work today. Of course, you are right that, if we could do a new design (perhaps based on the budget-based approach), we would not need the ownership stuff. But, without ownership changes now, the existing allocators will simply cause us all manner of problems. In particular, none of the spill logic added to Sort or HashAgg would work as they rely on a properly-functioning allocator. > Consider the following use-cases: Example I - - Single batch of size 8MB is received at time t0 and then is passed through a set of pass-through operators - At time t1 owned by operator Opr1, time t2 owned by operator t2, and so forth - Assume we report memory usage at time t0 - t2; this is what will be seen - t0: (fragment, opr-1, opr-2) = (8Mb, 0, 0) - t1: (fragment, opr-1, opr-2) = (0, 8MB, 0) - t2: (fragment, opr-1, opr-2) = (0, 0, 8MB) You are right. Each minor fragment is single-threaded: only one operator is "active" at a time as control passes from downstream to upstream operators. (Yes, this is the unfortunate Drill terminology: downstream calls upstream, data flows in the direction opposite to calls.) This single-threaded model is the insight behind the budget-based memory model. But, to get there, we must consider the whole system, we can't just make localized changes, unfortunately. > Example II - - Multiple batches of size 8MB are received at time t0 - t2 and then is passed through a set of pass-through operators - At time t1 owned by operator Opr1, time t2 owned by operator t2, and so forth - Assume we report memory usage at time t0 - t2; this is what will be seen - t0: (fragment, opr-1, opr-2) = (8Mb, 0, 0) - t1: (fragment, opr-1, opr-2) = (8Mb, 8MB, 0) - t2: (fragment, opr-1, opr-2) = (8Mb, 8Mb, 8MB) The above can, AFAIK, never happen. A batch is owned by an operator, not a fragment. A batch passes up the operator tree until it reaches the top or until it reaches a "buffering" operator such as Sort. > The key thing is that we clarify our reporting metrics so that users do not make the wrong conclusions. This is a good thing. But, we need to understand how the batches flow and report that accurately. Further, we must deeply understand this flow if we want to move to budget-based allocation without per-operator allocators. Let's separate various concepts. First is the instantaneous "stats" maintained by each operator allocator to enforce memory limits. Second is the total data that has passed through an operator. Third is the maximum memory used at any one time over the life of the operator. These are all very useful, but they measure different things. Thanks, - Paul
Re: [DISCUSS] batch ownership
Hi Vlad, More responses. > The same approach [as for internal operators] applies to senders and > receivers. Senders gets batches from the upstream operators taking ownership of those batches and send data to receivers. Senders receive data from an "upstream" operator, then serialize over the wire. As a result, Senders take ownership from the upstream operator, but then must transfer ownership to Netty. Here I'll speculate. I believe that we create a Netty composite buffer that strings together the buffers that underlie the value vectors in the outgoing record batch. (Yes, there are many layers in play.) Netty does not know about our allocator model. It does, however, have a reference count. So, my guess is that the Sender somehow gives up ownership of the outgoing buffer in the sense of the Drill allocator, but lets Netty drop the reference count once Netty has sent the buffer. I believe you are quite familiar with Netty, so perhaps you can dig around here and explain how this actually works. > Receivers get data from senders and reconstruct record batches. You are right logically. But, physically there is a difference. Data arrives via Netty which allocates buffers for the data. Receivers take these raw buffers and turn them into batches. Here things get even more complex (if that is possible.) The Receiver creates multiple vectors on top of a single Netty buffer. That is, multiple vectors were serialized together and were read together. Much of the complexity of Drill's memory model comes from the ability to create multiple (logical) DrillBufs on top of a single (physical) Netty buffer. This is where we need reference counts (so we know when the last shared use goes away), and where we need the UDLE/DrillBuf separation. So, again, Netty does not play the Drill "ownership" game, it only does reference counts. So the Receiver must convert from the Netty reference count of the big incoming buffer, to reference counts for each materialized vector, and create some kind of entry in Drill's allocator. I'm not sure how this is done; it would be great if you could figure this out. Could this be done differently? Probably. Maybe serialize each buffer by itself so that Netty creates separate buffers for each. I'd guess the original authors started with this design and moved to the present one, perhaps for performance reasons. (Anyone know of the history here?) > It is the business logic of senders and receivers and they may rely on other libraries (rpc and netty) or classes to handle serialization/de-serialization, buffering, acknowledgment, back-pressure or dealing with network. From other Drill operators point of view, senders and receivers are operators responsible for passing record batches from one drillbit to another. True. Senders/Receivers should speak Drill operator protocol on one side, Netty protocol on the other. They are adapters. Is this not what you see? > Following your approach it is necessary to modify MergingReceiver as well. It also pulls batches from a queue (see MergingRecordBatch.getNext()), but instead of almost immediately passing it to a next operator as UnorderReceiver does, MergingReceiver creates a new record batch from those batches that it pulls from the queue. To be consistent with proposed changes to UnorderReceiver, it is necessary to change the ownership of batches that MergingReceiver pulls as well especially that MergingReciver may keep reference to the original batch much longer compared to UnorderedReceiver (while it waits for batches from other drillbits). I personally don't know the details. But, in general, if one operator passes data to another, it should play by the Drill ownership rules if it works with vectors. If, instead, it works with buffers, then it should probably play by the Netty rules. > I don't see a reason to modify both UnorderedReceiver and MergingReceiver, instead, I think, we should modify allocator used when batches are created in the first place before they are added to a queue. My own suggestion here is that we may want to make use of an old-school technique that is still often handy: write up the design. Document the rules I've been doing my best to explain above. Add a detailed explanation of how Drill interfaces with Netty. Then, think through how we wan to handle the Drill-opererator-to-Netty interface. Another particularly nasty area is the "Mux" operators. Several folks struggled to understand them and didn't get very far. This is not a good state to be in. We should really understand how they work. Perhaps understanding the most complex case will help shed light on the case under discussion. Thanks, - Paul
Re: [DISCUSS] batch ownership
Specific answers based on my understanding. > I did not mean that a pass-through operator should not take the ownership of a batch it processes. My question was whether they do so and if they do, when and how. Yes, operators do take ownership, somewhere in the process of calling next() on their inputs. The exact place may vary between operators. In the Sort, for example, the code first checks the incoming batch size, spills sorted batches if needed to make space, then takes ownership. I'd go so far as to say that, if an operator does not take ownership, then it is a bug. > As far as I can see in the ProjectorTemplate code, the transfer is not done in all cases and when Projector operates in sv2 mode, there is no transfer of the ownership. Template code is code that is copied for each generated operator. In general, this code should be minimal. Code that is common to all operator instances should not reside in the template. Instead, it should reside in the operator (the so-called RecordBatch). There is really no reason to copy the same byte codes over and over, taking up space in the code cache. That said, the code to take ownership is likely to be in the Project operator implementation. Look for a place that works with "transfer pairs", they are the actual transfer mechanism. A quick glance at the code suggests this is done in ProjectRecordBatch.setupNewSchemaFromInput(). (An unfortunate name if we also do transfers.) > Additionally, when there is a transfer, it is done when the processing of the batch is almost complete. Depends on what you mean by "almost complete." Since Project is single-threaded, there is no harm in doing the transfer later rather than sooner; the upstream operator won't be called until Project again calls next(). Makes sense to do it earlier, but not necessary. > IMO, such behavior is counter intuitive and I would expect that if there is a transfer of the ownership, it is part of RecordBatch.next(), meaning that once an operator gets a reference to a record batch, it owns it. Perhaps. But, the Operator (that is, RecordBatch) protocol is a bit fussy. The next() call to RecordBatch tells that RecordBatch to build a batch of data and make it available. An operator has no visibility to its parent (its downstream operator). The caller must do the transfer as only the caller has visibility to its own vector container and that of the upstream (incoming) record batch. Yes, this is quite confusing. Nothing beats stepping though several operators to see how this works in practice. Here, I will put in a plug for the revised Operator classes in the "batch handling" code. The new classes try to disentangle the many bits of functionality combined in Record Batch. Those three are: 1) iterator protocol, 2) batch management, and 3) operator implementation. I believe we'll all understand this code better if we can separate these three concerns. > At this point, an operator may consume content of the record batch and create a completely new record batch or it can modify the record batch and pass it to the next downstream operator. Just to be clear, record batches (specifically vectors) are immutable. It is not possible to modify a record batch. One can, however reuse parts of it. A Filter can slap on an SV2. A Project can discard some vectors, add others, and retain still others. But, in both cases, the operator must produce a new batch based on those vectors. Specifically, each operator has its own VectorContainer that contain its own vectors. Sharing occurs at the level of DrillBufs that underlie the vectors. (Again, quite confusing, but it makes sense once you understand the operator allocators we discussed previously.) Part of the complexity comes from proper memory management. New vectors are allocated in the Project operator's allocator. Retained vectors are transferred from the upstream operator's allocator (ledger) to the that of the Project operator. Discarded vectors are released (perhaps after being shifted into the Project operator's allocator.) OK, again enough for one note. More to come. Thanks, - Paul
Re: [DISCUSS] batch ownership
Hi Vlad, Glad to see you are becoming an expert in the mechanics of data batch handling. This is a complex area that deserves the care and attention your are investing. Drill's current behavior reflects the design decisions of Drill's original authors. Unfortunately, those authors are no longer available. (If you are out there, lurking, now would be a great time to help out Vlad by explaining the original design.) Failing that, we have to use our collective knowledge of the intended design. Plus, we should explore ways to improve the design, as you seem to be doing. Drill has a complex memory model that works only if each operator ("record batch" in Drill's unfortunate terminology) takes ownership of each incoming record batch ("vector container" in Drill's terminology.) Recall that each operator has an operator-specific memory allocator with its own budget (though, at present, but budget numbers are completely artificial and nonsensical.) In addition, the minor fragment as a whole has a budget. For the operator budget to work, the operator must take ownership of incoming batches, and give up ownership of outgoing batches. Why? Because doing so is the only way to track the memory that each operator uses in its operator-specific allocator. While this may not be the ideal design, it is how Drill works today. If we move fully to the budget-based design, than this level of operator control will no longer be necessary, and will be an unnecessary complication. Under the budget model, only the minor fragment as a whole needs an allocator; each operator plays its part within the overall fragment budget. A planning step works out the memory budget for the query, the minor fragments and each operator. This is all explained in [1]. Under the budget model, each operator attempts to stay within its budget, spilling to disk as needed. The budget model works only if "single batch" operators (such as Project, Filter, etc.) are given sufficient memory to hold two batches. This, in turn, requires that we control the size of each batch as Padma and others are doing. That said, today exchanges *might* be special. My understanding is that some can receive a single batch from the network and feed that single batch to multiple slices ("minor fragments") of the same operator. This happens in, say, a broadcast exchange. You mention SV2 mode. In fact, SV2 mode should operate the same as "plain" batches: an SV2 is a single indirection vector on a single batch of data. Perhaps you meant "SV4 mode." Indeed, SV4 is special since an SV4 sits atop a large collection of batches and simulates a batch by picking out a collection of rows across the many batches. SV4 is used in the output of an in-memory sort (and perhaps other places.) There is no transfer of ownership in SV4 mode because the same batches will be used over and over until all data is delivered. It is the responsibility of the Sort operator to release the collection of batches once it has delivered all results (or the query fails.) Enough for this response. I'll send additional responses for your other points. The key concept to keep in mind is that the Drill memory system, as a whole, is quite complex. It can certainly be improved (as we are doing with the batch handling revisions.) But, we must consider the entire system when considering changes to any one part of the system. It is a complex topic; it is great that we have someone with your experience exploring our options. Thanks, - Paul [1] https://github.com/paul-rogers/drill/wiki/Batch-Handling-Upgrades On Sunday, April 29, 2018, 9:26:24 PM PDT, Vlad Rozovwrote: I did not mean that a pass-through operator should not take the ownership of a batch it processes. My question was whether they do so and if they do, when and how. As far as I can see in the ProjectorTemplate code, the transfer is not done in all cases and when Projector operates in sv2 mode, there is no transfer of the ownership. Additionally, when there is a transfer, it is done when the processing of the batch is almost complete. IMO, such behavior is counter intuitive and I would expect that if there is a transfer of the ownership, it is part of RecordBatch.next(), meaning that once an operator gets a reference to a record batch, it owns it. At this point, an operator may consume content of the record batch and create a completely new record batch or it can modify the record batch and pass it to the next downstream operator. The behavior above applies to an operator that consumes record batches from another operator. An input operator (scan or edge operator) is an operator that produces record batches from an external source (parquet file, hbase, kafka, etc). IMO, when such operators create record batches they should allocate memory using operator allocator compared to fragment allocator. If the memory is allocated using fragment allocator, there is