Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The following page has been changed by AlanGates:
http://wiki.apache.org/pig/PigMemory

New page:
= Pig Memory Usage Improvement =

== Problem Statement ==

 1. Pig hogs memory.  In the 0.2.0 version, the expansion factor of data on 
disk to data in memory is 2-3x.  This causes pig problems in terms of 
efficiently executing users programs.  It is caused largely by extensive use 
Java objects (Integer, etc.) to store internal data.
 1. Java memory management and its garbage collector are poorly suited to the 
workload of intensive data processing.  Pig needs better control over where 
data is stored and when memory is deallocated.  For a complete discussion of 
this issue see M. A. Shah et. al., ''Java Support for Data-Intensive Systems:  
Experiences Building the Telegraph Dataflow System''.

== Proposed Solution ==
Switching from using Java containers and objects to using large memory buffers 
and a page cache will address both of these issues.

=== Architecture ===
A basic page caching mechanism will be built.  Traditional OS page caches are 
fairly small (often a few K bytes).  This will use large
pages (default size around 10M or so).  This is chosen for a couple of reasons:

 1. It is convenient to constrain all of the scalar values in a tuple to fit 
into one buffer.  In order to do this the buffer sizes need to be large.
 1. Data in pig tends to be written all at once and read only one or two times. 
 Additionally, data reads generally consist of scans through all of the output 
of an operator.  So there should be relatively little need to bring pages in 
and out of memory repeatedly.

The page cache will consist of a !MemoryManager singleton, !TupleBuffers, and
!DataBuffers.  !TupleBuffer will contain a !DataBuffer.  It
will also contain the logic to manage spilling and reading the !DataBuffer from
disk.  The !MemoryManager will track !TupleBuffers
that are available for use and the !TupleBuffers that are full and eligible to 
be flushed to disk.  It will also track the total number
of !DataBuffers in memory.  The total number of !DataBuffers that can be 
created will be bounded by a configuration value.  It should
default to something like 250M (or maybe 70% of JVM memory if we can determine 
how much memory the whole JVM has been
given). 

When a !TupleBuffer needs to create a new !DataBuffer (either to store new data 
or to read in flushed data from
disk) then the !TupleBuffer will request one from the !MemoryManager.  If 
the maximum number of !DataBuffers are already in existence,
then the !MemoryManager will select, via an LRU algorithm, a !TupleBuffer that 
is full and in memory and request that it flush its data
to disk.  Once it has done that, it will take the now available !DataBuffer and
return it to the originally requesting !TupleBuffer.

In addition to this page caching system, new implementations of the Tuple and
Bag interfaces will be written.  !ManagedTuple will store
scalar objects and maps in !TupleBuffers as serialized data.  It will obtain
!TupleBuffers from the !MemoryManager, thus allowing many
tuples to share one !TupleBuffer.  This should significantly reduce the memory 
footprint.  One tuple will be
stored entirely within a given !TupleBuffer.  As data is written to a 
!TupleBuffer, some amount of space will be saved in the buffer to
allow tuples in that buffer to expand.  (Question, do we really need this?  How 
often do tuples grow?  The case I can think of for this
is when a value in a tuple is cast from one type to another.)  If a tuple
cannot fit entirely in a given !TupleBuffer it will request a
new !TupleBuffer to store its data in.  The !ManagedTuple will store an array
of offsets into the !TupleBuffer's !DataBuffer corresponding
to each scalar and map field.  References to tuples and bags will be stored in 
a separate array of Objects.  There is no requirement
that the tuples and bags inside a tuple use the same !TupleBuffer as the outer 
tuple.

!ManagedTuple will handle translating between serialized data in !TupleBuffers 
and Java objects used in the Tuple interface.  Hopefully
with boxing for numeric types this will be relatively fast.  Strings and maps 
will be somewhat slower.  Changes will also be necessary
to !DataByteArray to allow it to reference underlying data with an offset and a 
length, so that it need not copy bytes in and out of the
!TupleBuffer.  A new implementation of Map will also be necessary.  This
!ManagedMap will lay out its keys and values in a !TupleBuffer
and supports reading of values from that serialized data.  In the future we 
will investigate changing the Tuple interface to include non-object get methods 
for numeric types 
so that the tuple can return the data without the need of going through an 
object conversion.  This interface could be used by
arithmetic operators which operate on numbers anyway.

In order to facilitate this translation to objects, !ManagedTuple will store 
one byte with each field that records the type of the
field, and whether or not the field is null.  In the future we can investigate
an optimized version of !ManagedTuple that takes a schema
and thus avoids the need to store type info for every tuple.  In this case null 
information could be stored efficiently in a bit
vector.

=== Detailed Design ===

{{{
    /**
     * This is just a wrapper so I can store a byte[] in a container.  Entire 
class is
     * package level as no one outside of the data package should
     * be interacting with it.
     */
    class DataBuffer {

        static int bufferSize;

        static {
            String bufsize = 
System.getProperties().getProperty("pig.memory.databuffer.size", "10240");
            bufferSize = Integer.valueOf(bufsize) * 1024;
        }

        byte[bufferSize] data;
    }

    /**
     * A buffer to manage large byte arrays, count tuples referencing them,
     * and manage spilling them to disk.  Entire class is
     * package level as no one outside of the data package should
     * be interacting with it.
     */
    class TupleBuffer {

        // number of tuples with references in the buffer
        private int refCnt;
        private int nextOffset;
        // Package level so others can see it without the overhead of a read 
call.
        DataBuffer data;
        File diskCache;

        TupleBuffer() {
            MemoryManager.getMemoryManager().getDataBuffer();
            recycle();
        }

        void recycle() {
            diskCache = null;
            nextOffset = 0;
            refCnt = 0;
        }
            
        /**
         * Called by a tuple when it is going to store it's data in this 
TupleBuffer.
         */
        void attach() {
            refCnt++;
        }

        /**
         * Called by a tuple when it is deallocated.
         */
        void detach() {
            if (--refCnt < 1) {
                MemoryManager.getMemoryManager().recycle(this);
            }
        }

        /**
         * Write data to the buffer.
         * @param data bytes to write to the buffer
         * @return offset where data written starts
         */
        int write(byte[] data) {
            bringIntoMemory();
            write data into this.data;
            move nextOffset;
            if insufficient space return -1
            else return value of nextOffset before write
        }

        /**
         * Bring the buffer into memory.  This must be called
         * before the tuple begins to read the data.
         */
        void bringIntoMemory() {
            if data on disk {
                MemoryManager.getMemoryManager().getDataBuffer();
                read into memory
                diskCache = null;
                // pushes the buffer back onto the full queue, so it can be
                // flushed again if necessary.
                MemoryManager.getMemoryManger().markFull();
            }
        }

        /**
         * Determine if there is space in the buffer for a new tuple.
         * @param size estimated size (in bytes) of the new tuple.
         * @return true if there's room
         */
        boolean isSpaceForNew(int size) {
            assert in memory, otherwise no one should be trying add
            // configurable % should probably default to around 80% so there's 
room for growth.
            // We need to play with this and see what's optimal.
            boolean isspace = (bufferSize - nextOffset + size) / bufferSize) < 
some configurable %;
            if (!isspace) MemoryManager.getMemoryManager().markFull(this);
            return isspace;
        }

        /**
         * Determine if there is space in the buffer to expand
         * an existing tuple.
         * @param size bytes to write to the buffer.  This size
         * should be accurate, or at least a guaranteed upper bound.
         * @return true if there's room.
         */
        boolean isSpaceForGrowth(int size) {
            bringIntoMemory();
            return nextOffset + size < bufferSize;
        }

        /**
         * Flush to data to disk. 
         * @return 
         */
        DataBuffer flush() {
            diskCache = new File;
            diskCache.deleteOnExit();
            write data to diskCache;
            return data;
        }

    }

    /**
     * A class to manage all of the memory and data buffers.  Entire class is
     * package level as no one outside of the data package should
     * be interacting with it.
     *
     * This class tracks TupleBuffers and DataBuffers.  We can create as many
     * TupleBuffers as necessary.  But the number of DataBuffers is bounded
     * by the available memory provided by the system.
     */
    class MemoryManager {

        private static MemoryManager self = null;

        private Set<TupleBuffer> availableTupleBuffers;
        private List<TupleBuffer> fullTupleBuffers;
        private List<DataBuffer> availableDataBuffers;
        private int numCreatedDataBuffers;
        private int maxDataBuffers;

        static getMemoryManager() {
            if (self == null) {
                self = new MemoryManager();
            }
            return MemoryManager.self;
        }

        MemoryManager() {
            Determine available memory based on configuration.
            set maxDataBuffers
        }

        /**
         * Mark a TupleBuffer as no longer available to store new tuples.
         * It may still have room to grow tuples currently stored in it.
         * The TupleBuffer will also be put on the full list so that it
         * is eligible for flushing to disk if necessary.
         * @param tb TupleBuffer to take off the available list.
         */
        void markFull(TupleBuffer tb) {
            availableTupleBuffers.delete(tb);
            fullTupleBuffers.push_back(tb);
        }

        /**
         * Mark a TupleBuffer as available to take new tuples.  All 
         * existing data in this tuple buffer will be dropped.
         */
        void recycle(TupleBuffer tb) {
            tb.recycle();
            avilableTupleBuffers.add(tb);
        }

        /**
         * Get a TupleBuffer.  If there's one to use on the available list
         * then use it, otherwise create a new one.
         */
        TupleBuffer getTupleBuffer() {
            if (availableTupleBuffers.size() > 0) {
                return availableTupleBuffers.iterator().getNext();
            } else {
                TupleBuffer tb = new TupleBuffer();
                availableTupleBuffers.add(tb);
                return tb;
            }
        }

        /**
         * Get a DataBuffer.  If there's one to use on the available list
         * then return it.  Otherwise, if we have not yet reached the maximum
         * number of data buffers, create a new one.  As a last resort, tell
         * an existing full TupleBuffer to flush and then use its DataBuffer.
         */
        DataBuffer getDataBuffer() {
            if (availableDataBuffers.size() > 0) {
                return availableDataBuffers.pop();
            } else {
                if (numCreatedDataBuffers < maxDataBuffers) {
                    numCreatedDataBuffers++;
                    return new DataBuffer();
                } else {
                    TupleBuffer victim = fullTupleBuffers.pop_front();
                    availableDataBuffers.push(victim.flush());
                    return availableDataBuffers.pop();
                }
            }
        }

    }

    /**
     * An implementation of Tuple that works with managed memory.  It stores
     * complex types (tuples, bags, maps) as objects in an internal array.
     * Scalar types are laid out in a TupleBuffer as raw data.  The format
     * of the layout is 1 byte for type, 4 bytes for length (byte array and
     * char array types only), then the data.
     */
    class ManagedTuple implements Tuple {

        // Reference to the TupleBuffer holding data for this tuple.
        TupleBuffer buf;

        // Offsets into the tuple buffer for each field.  A negative value
        // indicates that the field is a complex type, and the absoluate value
        // yields the offsets into the complexFields array.  This is stored
        // as an array to avoid the overhead of an ArrayList.
        private int[] fieldOffsets;

        // Array of complex fields (tuples, bags, maps) in this tuple.  Stored
        // as an aray to avoid the overhead of an array list.
        private Object[] complexFields;

        // Do not add any more member variables.  We want to keep the memory
        // footprint of ManagedTuple to an absolute minimum.

        ManagedTuple() {
            buf = MemoryManager.getMemoryManager().getTupleBuffer();
            buf.attach();
        }

        public protected finalize() {
            buf.detach();
        }

        public void append(Object val) {
            if (val is complex) {
                grow fieldOffsets by 1;
                grow complexFields by 1;
                append val to complexFields;
                append negative offset to fieldOffsets;
            } else {
                if (buf.sizeToGrow(val.sizeof())) {
                    grow fieldOffsetsBy 1;
                    buf.bringIntoMemory();
                    fieldOffsets[last] = buf.write(val.toBytes());
                } else {
                    move tuple to new TupleBuffer, adding new last field
                }
            }
        }

        public Object get(int fieldNum) {
            if (fieldOffsets[fieldNum] < 0) {
                return complexFields[fieldOffsets[fieldNum] * -1];
            } else {
                buf.bringIntoMemory();
                determine type from buf.data.data[fieldOffsets[fieldNum]];
                instantiate correct type of object;
                return it;
            }
        }

        public void set(int fieldNum, Object val) {
            if (val is map) {
                m = new ManagedMap();
                m.putAll((Map)val);
                // may actually need to grow complex fields if this field
                // didn't use to be complex or if it was null.
                complexFields[fieldOffsets[fieldNum] * -1] = m;
            } else if (val is tuple or bag) {
                // may actually need to grow complex fields if this field
                // didn't use to be complex or if it was null.
                complexFields[fieldOffsets[fieldNum] * -1] = val;
            } else {
                if (buf.sizeToGrow(val.sizeof())) {
                    buf.bringIntoMemory();
                    fieldOffsets[fieldNum] = buf.write(val.toBytes());
                } else {
                    move tuple to new TupleBuffer;
                }
            }
        }
    }

    /**
     * An implementation of Map for use with managed memory.  Package level
     * access because it will only be used by ManagedTuple.
     */
    class ManagedMap implements Map<String, Object> {

        // Reference to the TupleBuffer holding data for this tuple.
        private TupleBuffer buf;

        void put(String key, Object val) {
            // throw, we can' amend an existing tuple.
        }

        void putAll(Map<String, Object> m) {
            tb.bringIntoMemory();
            write keys and values into buf, using length to delineate;
            value objects should be serialized the same as in tuple with a type;
        }

        Object get(String key) {
            tb.bringIntoMemory();
            Look through the data, skipping key to key until we find the right 
key;
            Deserialize teh value out of the map into appropriate object type 
and return.
        }
    }


}}}

A new class !ManagedBag will be created that does not extend 
!DefaultAbstractBag.  This bag will not support spilling.  It will
aggressively minimize the use of member variables to keep its memory footprint 
to a minimum.

== Proposed Methodology ==

As they say in the financial world, this document contains many statements that 
are forward looking and may or may not work out.  We
should prototype this along the way to assure ourselves that this will in fact 
bring the proposed improvement while maintaining
performance.  

Step one should be to prototype !ManagedTuple and !ManagedMap.  Then a stand 
alone tool can be written that will create tuples until it
runs out of memory.  It can then be run with existing !DefaultTuples and with
!ManagedTuples and see if significant improvements are
seen.  Improvement should be in the range of cutting the memory footprint in 
half.  This test should be run with all scalar data and
with data that includes maps and bags of maps (since this is a common use case 
for our users).

Assuming step one shows promising results, the next step should be to prototype 
the page caching system.  Queries that we know to
produce GC overhead type errors in existing code should be run with the page 
caching system to show that it properly handles the
situation with no GC overhead.

== Thoughts for Future Work ==
In the above referenced Telegraph paper the developers took the next step and 
managed not just the large memory buffers but even the
creation and deletion of objects.  This meant a couple of things:

 1. They needed to add explicit deallocation to their programming.  
 1. They needed to keep pools of available objects for recycling.

We may want to look at these options, though 1 in particular may be difficult 
as Java programmers (especially those who haven't
programmed in C++ or a similar language) have no concept of deallocating 
objects when they are finished with them.  However, they noted
that by totally circumventing the Java garbage collector they got around a 2.5x 
speedup of their system.  So it might be worth
investigating.

Reply via email to