Dear Wiki user, You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.
The following page has been changed by AlanGates: http://wiki.apache.org/pig/PigMemory ------------------------------------------------------------------------------ Response: I agree that allowing assignments of memory pages to specific operators could be useful. I am concnerned that Pig's planner is not sophisticated enough to make intelligent choices here. I would like to leave this as an area for future work. + = An Alternative Approach = + And now for something completely different. + + There seems to be general agreement that changing Pig's tuples to store data as byte arrays rather than objects is a good idea. Initial tests indicate that storing data as + bytes and operating on it as objects is much too expensive in terms of serialization costs. Talking with the hbase team who just went through a similar change, their choice + to deal with this was to keep data in byte format at all times, and only handle serialization on the client. Pig could not do this completely as we need to honor the existing + UDF interfaces and would need to deserialize objects to pass them to UDFs. But we could change internal operators to work on bytes, change loaders to store bytes, etc. We + could even add a byte oriented interface for UDFs to make use of if they wanted. Hbase reported a 10-100x performance speed up when they did this. I do not expect a similar + speed up for Pig, but hopefully we would get some boost. This is a very extensive project, as it involves rewriting many Pig operators and built in functions. + + There also seems to be general agreement that having Pig manage a buffer pool is a bad idea, or at least too complicated to take on. One observation that is particularly + relevant to this is that usually the only case where Pig gets into memory issues is with large bags. If it is possible to solve memory issues only for large bags, then we do + not need such a large scale general solution. The following proposal attempts to solve it just for bags. + + There are two main cases where bags grow too large. One, the user's data contains large bags, so that Pig is unable to load even a single record in the map phase. This is less + common. Two, one or a few groups on the reduce side are so large that they cannot fit into memory. This is quite common, and affects joins and group bys where the UDFs are not + algebraic. + + Hadoop has recently (in trunk, not yet in a release) added a new !MarkableIterator class. This iterator allows the user to set a mark in the iterator provided by the reduce and + at a later point reset to that mark. The way this is implemented (as explained to me by Owen O'Malley) is that it remembers its position in the list when the user marks. When + the user resets, it restarts the merge, going until it hits the mark. This means that one, it never writes to disk or runs out of disk space, and two the cost is not free + because it is rereading from disk and remerging the data. + + I propose to use this iterator as follows. A new type of bag will be created, !MarkableBag. It will be used by !POPackage and !POJoinPackage to store records coming out of the + reduce. It will store X number of records in memory (see below for details on how X will be decided). Once it reaches X, it will quit reading records from the iterator and + place a mark at that point in the iterator. The !MarkableBagIterator returned by the '''MarkableBag.iterator()''' will return those tuples in memory, and then use the + underlying reduce iterator to read any remaining records. More details on how this will work: + + {{{ + + class MarkableBag implements DataBag { + + Collection<Tuple> tuples; + MarkableIterator iter; + long guid = 0; + + // All the regular bag functions + + private generateGuid(); // generate a guid. + + class Iterator MarkableBagIterator implements Iterator<Tuple> { + long guid; + + MarkableBagIterator(MarkableDataBag parent) { + guid = parent.generateGuid(); + // no need for locking, as this is single threaded + parent.guid = guid; + parent.iter.reset(); + } + + boolean hasNext() { ... } + + Tuple next() { + if (parent.guid != guid) throw ConcurrentModificationException; + + if (still in memory) return tuples.next(); + else return parent.iter.next(); + } + } + } + }}} + + The limit here is that only one iterator can read from the bag at a time. Given that Pig's use case is that the bag be handed to each UDF in turn, this restriction is acceptable. + The only case we cannot support is where a given UDF wants to read the bag with multiple iterators at a time. I am not aware of any UDF currently doing this, but a user might + want to. + + The !MarkableBag will not work for large tuples in the input data, and will not work for UDFs that wish to have multiple iterators into the bag at the same time. For this + reason we will add a second type of bag, !SpillableBag. Like !MarkableBag it will store X number of records in memory, and then spill remaining records to disk. It will be + used in the map phase to store any user generated data. If a UDF in the reduce phase defines a new interface !ReadConcurrently (I'm open to a better name here) then + !SpillableBag will be used. + + The difficult question is how to define X (that is, the number of tuples held in memory in each bag). There are two options. One is to make X an absolute number. It + would be a java property, and would default to say 1000. The advantages are simplicity and speed (the bag just needs to check its count). Two is to make X a number of bytes, + again a java property that would default to a size of a few megabytes. A bag would estimate the size of each tuple in it (by sampling a few) and stop holding new tuples in memory when it reaches a certain limit. This is slightly more + precise, but much more expensive. I would vote for starting with option one. + + The current !SpillableMemoryManager and the bags that register with it would be removed. + + The advantages to this are that it is much simpler than the above proposed page cache mechanism. It also has the advantage that is removes the need for the proposed !ReadOnce and + Accumulating interfaces (see https://issues.apache.org/jira/browse/PIG-844 and https://issues.apache.org/jira/browse/PIG-807) to UDFs. It also removes the current spilling + mechanism which is faulty and a performance drag. + +
