Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The following page has been changed by UtkarshSrivastava:
http://wiki.apache.org/pig/PigExecutionModel

------------------------------------------------------------------------------
  = Introduction =
  
- The goal is to decide how to structure operator evaluation pipelines in Pig. 
The major issues include whether data flow follows a push model or a pull 
model, whether an operator evaluation tree is multi-threaded or 
single-threaded, and what the API for user-defined functions (UDFs) looks like.
+ __Task__: Redesign Pig logical plan structure and execution engine
  
+ __Goals__:
+  * Address current inefficiencies.
+  * Open up new opportunities for optimization.
- The new model needs to support arbitrary operator DAGs (which may arise in a 
single pig program, or when jointly executing groups of interrelated programs).
+  * Support arbitrary operator DAGs (which may arise in a single pig program, 
or when jointly executing groups of interrelated programs).
  
- = Alternative Execution Models =
  
- Some possible execution models are:
+ == Logical Plan Structure ==
  
- == Model 1 ==
+ '''Current Problems''':
+  * We use operators to manipulate the outermost dataset, but eval specs to 
manipulate the nested data, which reduces code reuse and increases complexity.
+  * Eval specs are assumed to be a linear chain. Thus it makes doing splits 
and general DAGs difficult.
  
-    * One thread.
-    * Top-level scheduler round-robins through "leaf" operators.
-    * Each time an operator is invoked, it gets handed exactly one tuple.
-    * Special "split" operator buffers data that gets fed to multiple 
operators; at most one tuple gets buffered at each split point.
-    * UDF API: declare zero or one input bag as "streaming"; init() hands it 
all data except the streaming bag; next() hands one tuple from streaming bag
+ '''Proposal''':
+  1. Get rid of eval specs, make everything operators
+  1. Since Pig deals with nested data models and allows manipulation of nested 
data, it is only natural for the logical plan structure to be fully nestable, 
e.g. the `foreach` operator can have a nested query plan that it uses to 
process each input tuple.
+  1. Execute the outermost operators using their map-reduce implementations if 
any (see table below). Execute all nested query plans using local execution 
mode.
+  1. Add a split operator that replicates input data along multiple outgoing 
branches. This will help us to support multiple outputs and DAGs.
  
- == Model 2 ==
  
+ '''Advantages of a nested query plan''':
+     * Same operator set used for processing both the outermost, as well as 
nested data; no code duplication, easier to understand.
+     * Can reuse the local execution mode to process the nested query plans
+     * Can allow for generalization of Pig Latin in the future where the 
language within `FOREACH` can be the same as that outside it.
-    * One thread per "leaf" operator.
-    * Scheduling done by OS.
-    * Operator gets to read as many tuples as it wants; if it reads from 
multiple inputs, can interleave "next()" calls on the inputs in arbitrary 
fashion.
-    * Split operator may buffer up to K tuples (or B bytes); if an operator 
tries to read too far ahead it gets blocked until other operators reading from 
the same buffer catch up.
-    * Deadlock can arise; need to detect it and release it by relaxing the K/B 
constraint on one or more of the split buffers.
  
- == Discussion ==
  
- === Model 1 Drawbacks ===
  
+ Here is a list of proposed operators:
-    * underutilize multi-core systems? (depends whether the policy is to 
assign several map/reduce tasks to a machine)
-    * difficult (or impossible?) to support operations that require streaming 
access to multiple inputs (e.g., merge join, merge-based set difference, etc. 
which operate over pre-sorted input streams)
-    * UDF APIs more complex?
  
- === Model 2 Drawbacks ===
+ || '''Operator''' || '''Attributes''' || '''Number of inputs in query plan''' 
|| '''Semantics''' || '''Implementation (M-R vs local)''' ||
+ || LOAD || file names, load function || 0 || Loads the contents of the files 
using the given load function into a bag || Same (with file system abstraction 
layer ||
+ || STORE || file name, store function, in future a hierarchy of fields to 
partition by || 1 || Stores the input bag into the given file (in future, 
partitioned by the given field hierarchy) || Same (with file system abstraction 
layer) ||
+ || FOREACH || nested query plan || 1 || Applies the nested query plan to each 
item of the input bag to produce an output bag || Same ||
+ || GENERATE || None ||>=1 || Computes the output(s) of its child query 
plan(s), and concatenates them together into output tuple(s). This will 
typically be the topmost operator in the nested query plan within the FOREACH 
operator|| Same ||
+ || FILTER || nested query plan for condition || 1 || Applies the nested query 
plan to each item of the input bag. If the plan returns true, the item is put 
in the output bag, otherwise not. || Same ||
+ || GROUP/COGROUP || nested query plans, one for the grouping criteria of each 
input || >=1 || Applies the appropriate nested query plan to each item of each 
input to determine its group. Items within the same group are grouped together 
into bags || Different: M-R will use map-reduce, Local will use our new 
Sorted``Bag to sort and collect data into bags ||
+ || SORT || list of columns on which to sort, ASC, DESC flags for each column 
|| 1 || Orders the input bag to produce an output bag || Different: M-R will 
use quantiles and map-reduce to do a parallel sort, local will use Sorted``Bag. 
||
+ || DISTINCT (Blocking) || None || 1 || Eliminates duplicates in the input bag 
to produce an output bag || Different: M-R will rewrite into group/foreach, 
local will use Distinct``Bag. ||
+ || PROJECT || list of column numbers or schema names || 1 || Selects the 
specified columns of the input tuple into the output tuple || Same ||
+ || MAP_LOOKUP || a list of keys to lookup || 1 || Selects the contents of the 
specified keys into an output tuple || Same ||
+ || BINCOND || 3 nested query plans: 1 specifying condition, 1 specifying what 
to ouptut when condition is true, and 1 specifying what to output when 
condition is false || 1 || Same as conditional expression in C++/Java (a>b?c:d) 
|| Same ||
+ || COMPARISON (<, <=, >, >=, matches, ==, !=) || None || >=1 || Computes the 
output(s) of its child query plan(s), and compares them according to the 
specified logical operator, outputs a boolean || Same ||
+ || AND/OR/NOT || None || >=1 || Computes the (boolean) output(s) of its child 
query plan(s), and combines them according to the specified logical operator, 
outputs a boolean || Same ||
+ || CONSTANT || constant value || 0 || Outputs a constant value || Same ||
+ || UDF_APPLY || UDF to apply || >=1 || Computes the output(s) of its child 
query plans, assembles the results together into a tuple, and applies the UDF 
using that tuple as the argument. The result is passed on as output. || Same ||
+ || STREAM || program to invoke || >=1 (may have multiple outputs as well) || 
streams the input to the external program without waiting for output. The 
output arrives at some later point in time || Same ||
+ || SPLIT || None || 1 (only operator to have multiple outputs apart from 
STREAM) || replicates input along both output branches || different (depends on 
our choice of push vs pull model). If pull, M-R buffers to DFS, local buffers 
in memory (spilling to disk if necessary). ||
+ || UNION || none || >=1 || union of child query plans || Different (Map-side 
union will be no-op, reduce side will cause break in pipeline) In local mode, 
straightforward. ||
  
-    * thread synchronization overhead
-    * complexity of multi-threaded implementation
+ == Plan Execution ==
+ 
+ Each of the above logical operators will translate to a physical operator (in 
many cases, the physical operator will be shared between backends, as shown in 
the above table).
+ 
+ One physical operators have been linked together into a query plan, they must 
be executed. There is a choice of mainly 2 models for execution (assume that 
data flows downwards in an execution plan):
+ 
+  1. '''Push''': Operator A pushes data to B that operates on it, and pushes 
the result  to C.
+  2. '''Pull''': Operator C asks B for its next data item. If B has nothing 
pending to return, it asks A. When A returns a data item, B operates on it, and 
finally returns the result to C.
+ 
+ 
+ '''Pull API'''
+ {{{
+ public interface Pull<T> {
+     /*
+     * T is the return type. In the most general case
+     * it can be Object, so we might omit T altogether.
+     */
+ 
+     public void open();
+ 
+     public T getNext();
+ 
+     public void close();
+ }
+ }}}
+ 
+ '''Push API''':
+ {{{
+ public interface Push<T> {
+     /*
+     * T is the output type. In the most general case
+     * it can be Object, so we might omit T altogether.
+     */
+ 
+     public void open();
+ 
+     public putNext(T);
+ 
+     public void close();
+ }
+ }}}
+ 
+ 
+ Each model has its own unique advantages and disadvantages. Pull is more 
natural when there are multiple inputs, push is natural when there are multiple 
outputs. In the context of our operators, there are several places where one of 
the models is a natural fit, and is hard to replace.
+ 
+ '''Pull''' - Natural use in:
+  * UDFs already pull data by using an iterator.
+  * FILTER, GROUP/COGROUP, BINCOND : Evaluate their nested query plans using 
pull (can be converted to push, though unnatural).
+  * COMPARISON, AND/OR/NOT: Pull data because they have multiple inputs
+ 
+ '''Push''' - Natual use in:
+  * EvalFunc<Bag>: Eval functions push data (through bag.add()). If we go with 
a pull model, have to hold in memory any bag output by a function (seems 
reasonable).
+  * SPLIT: Multiple outputs are most naturally supported through the push 
model.
+ 
+ Disadvantages:
+ 
+ '''Pull''':
+  * Requires buffering whenever there are multiple outputs.
+  * Requires multiple passes over the data if multiple aggregates are to be 
computed.
+ 
+ '''Push''':
+  * No convenient API to push data to UDFs that have multiple inputs.
+ 
+ === Proposal ===
+ 
+  1. Single-threaded execution.
+  1. No API changes to UDFs in the short term. In the long term, we might 
introduce an aggregation function API that is push-based along the lines of the 
[http://www.postgresql.org/docs/8.2/static/sql-createaggregate.html Postgres 
user-defined aggregation function API.
+  1. Make the model entirely *push-based*. Reasons:
+     1. Accomodates multiple outputs more naturally.
+     1. Accomodates possible future change of having special aggregate 
function API to iterate over data only once.
+     1. Functions can still pull data; we will push an entire iterator to 
them, instead of pushing tuple by tuple.
+  1. Implement hadoop iterator cloning/rewind so that we don't have to do our 
own disk writes.
+  1. Before hadoop iterator cloning becomes available, we could even 
materialize bags in memory, as today and this model works.
+  1. When multiple iterators become available (either by reduce-side extension 
of hadoop), or when doing a map-side cogroup, it will fit nicely into this 
model.
+ 
+ 
+ === Use Cases ===
+ 
+ To be written
+ 
+ 
  
  === Related Reading ===
  
-    * Fjords paper 
+    * Fjords paper
        * paper: http://db.lcs.mit.edu/madden/html/madden_fjords.pdf
        * slides: 
http://www.cs.umd.edu/class/fall2002/cmsc818s/Lectures/fjords.pdf
-    * Stream Programming Model / MIT Stream-It 
+    * Stream Programming Model / MIT Stream-It
        * official page for stream-it: http://www.cag.csail.mit.edu/streamit/ 
(Articles on the compiler might be useful)
  

Reply via email to