Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The following page has been changed by AlanGates:
http://wiki.apache.org/pig/PigMemory

------------------------------------------------------------------------------
  Response:  I agree that allowing assignments of memory pages to specific 
operators could be useful.  I am concnerned that Pig's planner is not 
sophisticated enough to make
  intelligent choices here.  I would like to leave this as an area for future 
work.
  
+ = An Alternative Approach =
+ And now for something completely different.
+ 
+ There seems to be general agreement that changing Pig's tuples to store data 
as byte arrays rather than objects is a good idea.  Initial tests indicate that 
storing data as
+ bytes and operating on it as objects is much too expensive in terms of 
serialization costs.  Talking with the hbase team who just went through a 
similar change, their choice
+ to deal with this was to keep data in byte format at all times, and only 
handle serialization on the client.  Pig could not do this completely as we 
need to honor the existing
+ UDF interfaces and would need to deserialize objects to pass them to UDFs.  
But we could change internal operators to work on bytes, change loaders to 
store bytes, etc.  We
+ could even add a byte oriented interface for UDFs to make use of if they 
wanted.  Hbase reported a 10-100x performance speed up when they did this.  I 
do not expect a similar
+ speed up for Pig, but hopefully we would get some boost.  This is a very 
extensive project, as it involves rewriting many Pig operators and built in 
functions.
+ 
+ There also seems to be general agreement that having Pig manage a buffer pool 
is a bad idea, or at least too complicated to take on.  One observation that is 
particularly
+ relevant to this is that usually the only case where Pig gets into memory 
issues is with large bags.  If it is possible to solve memory issues only for 
large bags, then we do
+ not need such a large scale general solution.  The following proposal 
attempts to solve it just for bags.
+ 
+ There are two main cases where bags grow too large.  One, the user's data 
contains large bags, so that Pig is unable to load even a single record in the 
map phase.  This is less
+ common.  Two, one or a few groups on the reduce side are so large that they 
cannot fit into memory.  This is quite common, and affects joins and group bys 
where the UDFs are not
+ algebraic.
+ 
+ Hadoop has recently (in trunk, not yet in a release) added a new 
!MarkableIterator class.  This iterator allows the user to set a mark in the 
iterator provided by the reduce and
+ at a later point reset to that mark.  The way this is implemented (as 
explained to me by Owen O'Malley) is that it remembers its position in the list 
when the user marks.  When
+ the user resets, it restarts the merge, going until it hits the mark.  This 
means that one, it never writes to disk or runs out of disk space, and two the 
cost is not free
+ because it is rereading from disk and remerging the data.
+ 
+ I propose to use this iterator as follows.  A new type of bag will be 
created, !MarkableBag.  It will be used by !POPackage and !POJoinPackage to 
store records coming out of the
+ reduce.  It will store X number of records in memory (see below for details 
on how X will be decided).  Once it reaches X, it will quit reading records 
from the iterator and
+ place a mark at that point in the iterator.  The !MarkableBagIterator 
returned by the '''MarkableBag.iterator()''' will return those tuples in 
memory, and then use the
+ underlying reduce iterator to read any remaining records.  More details on 
how this will work:
+ 
+ {{{
+ 
+     class MarkableBag implements DataBag {
+ 
+         Collection<Tuple> tuples;
+         MarkableIterator iter;
+         long guid = 0;
+ 
+         // All the regular bag functions
+ 
+         private generateGuid(); // generate a guid.
+ 
+         class Iterator MarkableBagIterator implements Iterator<Tuple> {
+             long guid;
+ 
+             MarkableBagIterator(MarkableDataBag parent) {
+                 guid = parent.generateGuid();
+                 // no need for locking, as this is single threaded
+                 parent.guid = guid;
+                 parent.iter.reset();
+             }
+ 
+             boolean hasNext() { ... }
+ 
+             Tuple next() {
+                 if (parent.guid != guid) throw 
ConcurrentModificationException;
+ 
+                 if (still in memory) return tuples.next();
+                 else return parent.iter.next();
+             }
+         }
+     }
+ }}}
+ 
+ The limit here is that only one iterator can read from the bag at a time.  
Given that Pig's use case is that the bag be handed to each UDF in turn, this 
restriction is acceptable.
+ The only case we cannot support is where a given UDF wants to read the bag 
with multiple iterators at a time.  I am not aware of any UDF currently doing 
this, but a user might
+ want to.
+ 
+ The !MarkableBag will not work for large tuples in the input data, and will 
not work for UDFs that wish to have multiple iterators into the bag at the same 
time.  For this
+ reason we will add a second type of bag, !SpillableBag.  Like !MarkableBag it 
will store X number of records in memory, and then spill remaining records to 
disk.  It will be
+ used in the map phase to store any user generated data.  If a UDF in the 
reduce phase defines a new interface !ReadConcurrently (I'm open to a better 
name here) then
+ !SpillableBag will be used.
+ 
+ The difficult question is how to define X (that is, the number of tuples held 
in memory in each bag).  There are two options.  One is to make X an absolute 
number.  It
+ would be a java property, and would default to say 1000.  The advantages are 
simplicity and speed (the bag just needs to check its count).  Two is to make X 
a number of bytes,
+ again a java property that would default to a size of a few megabytes.  A bag 
would estimate the size of each tuple in it (by sampling a few) and stop 
holding new tuples in memory when it reaches a certain limit.  This is slightly 
more
+ precise, but much more expensive.  I would vote for starting with option one.
+ 
+ The current !SpillableMemoryManager and the bags that register with it would 
be removed.
+ 
+ The advantages to this are that it is much simpler than the above proposed 
page cache mechanism.  It also has the advantage that is removes the need for 
the proposed !ReadOnce and
+ Accumulating interfaces (see https://issues.apache.org/jira/browse/PIG-844 
and https://issues.apache.org/jira/browse/PIG-807) to UDFs.  It also removes 
the current spilling
+ mechanism which is faulty and a performance drag.
+     
+ 

Reply via email to