Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 

The following page has been changed by GuntherHagleitner:

  Will result in a job that produces bar, then the mv and rm are executed. 
Finally, another job
  is run that will generate foo.
- [[Anchor(Phases)]]
- == Phases ==
- These are the identified steps to get the proposal implemented.
- [[Anchor(Phase_1)]]
- === Phase 1 ===
- Phase one is about getting some infrastructural things in place. 
-    * Batch execution (instead of single store execution)
-    * Merge logical plans into single
-    * Updated explain/illustrate
-    * Change local and hadoop engine to execute multiple store plans (with 
store-load per split).
- At the end of phase one, we'll have implicit splits. But the MR plan will be 
the same as if the user made the splits explicit.
- [[Anchor(Phase_2)]]
- === Phase 2 ===
- Phase two is about getting the MR Compiler to agree to this.
-    * Allow multiple stores in single job
-    * Merge multiple plans into the split operator
-    * Terminate all but one with stores
- [[Anchor(Phase_3)]]
- === Phase 3 ===
- Phase three is about using the combiner/reducer on multiple split branches in 
the same job
-    * Merge Combiner/Reducer plans
-    * Put in logic to decide when to multiplex pipelines
  == Internal Changes ==
- [[Anchor(Grunt_parser_(Phase_1))]]
+ [[Anchor(Grunt_parser)]]
- ==== GruntParser/PigServer (Phase 1) ====
+ ==== GruntParser/PigServer ====
  The parser currently uses a bottom up approach. When it sees a store (dump, 
explain), it goes bottom up and generates the plan that needs to happen for 
this particular store. In order to optimize the multi-query example, we need, 
however, a peek on the entire graph for a script (interactive mode is handled 
- The highlevel changes to the parser/server are:
+ The high-level changes to the parser/server are:
     * Not execute the plan when we see a store or dump
     * Alter the already existing merge functionality to allow intersecting 
graphs to be joined into a single logical plan.
@@ -539, +508 @@

  If the multi-query optimization is turned off all graphs will be generated as 
interactive, which is how we revert the behavior.
- The merging of the different logical plans is done in the OperatorPlan, where 
merges can now either check for disjoint graphs or merge them with overlaps.
+ The merging of the different logical plans is done in the OperatorPlan, where 
merges can now either check for disjoint graphs or merge them with overlaps. 
Merged plans are later passed on to the implicit split inserter at which point 
all the overlapping operators from the merge will result in an explicit split.
  Finally, the store-load handling is done in the pig server. It will either 
transform the plan or add a store-load connection. Absolute filenames will be 
available, since the QueryParser now translates them when it sees them.
  The grunt parser makes use of the new PigServer APIs. It will use the batch 
API to parse and execute scripts. Since scripts can be nested, the script 
execution and the stack of graphs in the PigServer are closely related.
- [[Anchor(Explain_(Phase_1))]]
+ [[Anchor(Explain)]]
- ==== Explain (Phase 1) ====
+ ==== Explain ====
  As described above the changes are:
@@ -564, +533 @@

  CoGroups are special in that their nested plans are connected to input 
operators. This is modeled as subgraphs as well and the subgraphs are connected 
to the respective input operator.
+ [[Anchor(Map Reduce Engine)]]
+ ==== Map Reduce Engine ====
+ Let's first look at how a multi-query is processed in the hadoop case. We 
have already discussed how the 
+ logical plan is generated, driven by the PigServer and GruntParser. The next 
step is translation of the logical plan
+ to the physical plan. This has not changed except that in the multi-query 
case it has to carry connections of store 
+ to load operator through to the physical plan.
+ [[Anchor(MRCompiler)]]
+ ===== MRCompiler =====
+ After that the MRCompiler takes the physical plan and breaks it into a plan 
of map-reduce operators. The handling of
+ split operators has not changed. What happens is:
+ When a split is encountered, it is removed from the plan and replaced by a 
store to a temporary path with BinStorage.
+ The split branches are placed in new map-reduce operators that load from the 
temporary path.
+ Some minor changes to the MRCompiler are:
+    * Edges from store to load operator are removed and their respective 
map-reduce operators are connected. That will enforce that the store job runs 
before the load job.
+    * Jobs that result from a split are marked as splitters and splittees, so 
that in the optimization we can look at those.
+ [[Anchor(MultiQueryOptimizer)]]
+ ===== MultiQueryOptimizer =====
+ After the compiler the optimizers are run. The last one in the chain is the 
MultiQueryOptimizer. This optimizer is looking at all splitters and splittees 
and will merge them recursively - producing what was described above in 
ExecutionPlans. Since this step is run after the combiner optimizer, it can 
also go ahead and merge combiners if necessary. If the optimizer cannot merge 
it will keep the existing store that the split produced in the MRCompiler.
+ The merging of splittees into a splitter consists of:
+    * Creating a split operator in the map or reduce and setting the splittee 
plans as nested plans of the split
+    * If it needs to merge combiners it will introduce a Demux operator to 
route the input from mixed split branches in the mapper to the right combine 
plan. The separate combiner plans are the nested plans of the Demux operator
+    * If a map reduce operator does not have a combiner it will insert a 
FakeLocalRearrange operator to simply route the input through.
+    * If it needs to merge reduce plans, it will do so using the Demux 
operator the same way the combiner is merged.
+ Note: As an end result this merging will result in Split or Demux operators 
with multiple stores tucked away in their nested plans.
+ There are two more minor optimizers:
+    * NoopFilterRemover will remove constant-true filters that splits produce
+    * NoopStoreRemover will remove implicit stores to temporary files, if 
there is a user initiated store that contains the same information
+ [[Anchor(MapReduceLauncher)]]
+ ===== MapReduceLauncher =====
+ The map reduce launcher receives the map-reduce operator plan from the 
compilation stage. It uses the JobControlCompiler to compile map-reduce 
operators into hadoop job control objects and submits them to hadoop for 
+ The biggest change stems from the fact that with the current hadoop system we 
need to store multiple output from a single job in a temp directory first and 
then have to move it to the real location. Map-reduce operators that are 
connected in the graph mean that the successor needs to read files that the 
parent produces. In order to do that we need to submit the graph in several 
steps and after each one we need to move the files to their final location. So 
the overall job submission works like this:
+    * Remove all root operators
+    * Compile these operators into jobs
+    * Submit jobs to hadoop
+    * Move result files or collect failure
+    * Repeat unless map-reduce plan is empty
+ [[Anchor(JobControlCompiler)]]
+ ===== JobControlCompiler =====
+ The JobControlCompiler distinguishes between multi store and single store 
map-reduce operators. Single store operators are processed the same way it was 
done before: The store is removed and the job configuration is set up to put 
the records that come out of the pipeline in the right files.
+ Multi store plans are handled differently. The compiler sets the output path 
to a newly created temporary directory. It also leaves the store operators in 
the plan; These operators will at -execution time- create subdirectories of the 
temporary path and direct records to that directory.
+ Let's assume we have 2 stores in the plan to directories "foo" and 
"/out/bar". After running the job one gets the following temporary directory 
+ {{{
+ /tmp/temp-<number>/abs/out/bar/part-*
+ /tmp/temp-<number>/rel/foo/part-*
+ }}}
+ The JobControlCompiler will then as a final step move the result files to 
their final locations.
+ [[Anchor(Store Operator)]]
+ ===== Store Operator =====
+ Finally, after the MapReduceLauncher has submitted the jobs execution starts 
on the backend.
+ The store operator changed to deal with multiple outputs from a single 
map-reduce job as well as enabling the local engine to handle multiple stores 
in the same plan.
+ The operator itself has a store implementation that is instantiated by the 
execution engine that handles the output. Store has a "getNext" function that 
stores the input record and returns a null record. That basically means that 
stores can be placed into an execution pipeline as an operator with a 
side-effect that will consume all input records. 
+ The local engine's implementation of the store is straight forward. It 
creates the StoreFunc and binds the output stream to it. 
+ The map reduce engine's implementation of the store needs to set up a few 
more things. It creates a copy of the job configuration and overwrites some key 
fields. It changes the output and temporary work output directory to point to a 
temporary path specific to the store being processed (see JobControlCompiler). 
It also sets PigOutputFormat as the output class as well as registering the 
actual store function that PigOutputFormat is going to use. Finally it creates 
a new record writer and output collector with these settings.
+ [[Anchor(Split_operator)]]
+ ===== Split operator =====
+ The split operator can now be used at execution time too. It used to always 
be removed by the compiler (and replaced by store-load combinations). So, 
here's how the split operator works:
+    * Every input record will be attached to each nested plan in sequence
+    * Until a plan is exhausted it will keep returning records from that plan
+    * Once EOP is returned the next plan will be pulled from.
+    * If all plans have been dealt with already the next input record is 
+    * If there are no more input records the EOP is passed on to the requester
+ [[Anchor(DemuxOperator)]]
+ ===== Demux Operator =====
+ The demux operator is used in combiners and reducers where the input is a mix 
of different split plans of the mapper. It will decide which of it's nested 
plans a record belongs to and then attach it to that particular plan.
- [[Anchor(Local_Execution_engine_(Phase_1))]]
+ [[Anchor(Local_Execution_engine)]]
- ==== Local Execution engine (Phase 1) ====
+ ==== Local Execution Engine ====
- We need to make the local execution engine understand multiple store plans 
too. This might come for free or at least cheaply. The current local engine 
uses a call to store() on each physical store node to trigger the execution of 
the pipeline and write out the result. Split is realized as a blocking operator 
that will process the entire input and hands out an iterator to the tuples.
+ The local engine has not changed as much as the map reduce engine. The local 
engine executes the physical plan directly. The main changes were:
- This will give the right result, but re-processes all the dependencies once 
per store. We might in a later phase want to align this better with the hadoop 
engine and allow a non blocking split as well as a separating the storing of 
the records with the pulling from the pipeline.
+    * Allow for multiple stores in the physical plan
+    * Respect order of execution when there are edges between store and load 
+ The local engine will thus execute the physical plan as follows:
- [[Anchor(Implicit_split_insertion_(Phase_1))]]
- ==== Implicit split insertion (Phase 1) ====
- Implicit split insertion already exists as part of the optimizer. It 
translates any non-split logical node with multiple outputs into a split - 
split output combination. This is what we need to put the splits for the 
multi-query optimization in place. Right now, however, the parser is set up in 
a way that multiple stores will never end up in the same plan and thus the 
insertion doesn't happen for this case. 
- In short: Once we change the parser to look at the entire graph for a script 
instead of a store-by-store basis, we will get this for free. We might actually 
have to add logic to suppress this behavior in cases where the split will be 
slower than the dual processing.
+    * Get all the store operators with dependent load operators in a 
dependency order
+    * Execute these stores one by one
+    * Get all remaining stores and execute them in at once
- [[Anchor(Store/Multiple_output_(Phase_2))]]
- ==== Store/Multiple output (Phase 2) ====
- If we put implicit splits in place and enhance splits to contain additional 
operators or even multiplex the split output within the same map reduce job, at 
some point either a map or a reduce job need to be able to produce multiple 
outputs. Currently there is a single output collector that will store the 
results in part-* files.
- Here are some options:
- [[Anchor(hadoop_0.19_supports_MultipleOutput)]]
- ===== hadoop 0.19 supports MultipleOutput =====
- Link:,%20java.lang.String,%20java.lang.Class,%20java.lang.Class,%20java.lang.Class)
- All the output will still be in the same directory, but the developer can 
give names for different sets of output data. So, in our case we might name the 
output "split1" and "split2" and the output would come out to be:
- {{{
- /outdir/split1-0000
- /outdir/split1-0001
- /outdir/split1-0002
- /outdir/split2-0000
- }}}
- [[Anchor(Side-Effect_files)]]
- ===== Side-Effect files =====
- Link:
- [[Anchor(Store_operator_(Phase_2))]]
- ===== Store operator (Phase 2) =====
- In the current system the store op is removed by JobControlCompiler and used 
to set up the output directory, etc. It holds information about how and where 
to store the files, but doesn't actually do anything in the pipeline. The logic 
for storing records in the map-reduce case, is handled by the map only and map 
reduce classes. The logic is simple: Whatever comes out of the pipeline will be 
transformed into a key/value pair collected using the output collector. 
- Using multiple output it would make sense to let the store operator do the 
actual collection. The job compiler would have the duty to configure multiple 
output streams and then assign the right collectors to the store operator. The 
actual mapper/reducers will still have the responsibility to run the pipeline 
and check for errors but the storing is handled by the store operator. Anything 
that is not stored and trickles out at the bottom of the pipeline goes into the 
standard collector. After the job is run we will have to move the files to the 
right directories.
- It seems to make more sense to use hadoop's multiple output functionality. 
Trying to build the same functionality with side-effect files will duplicate 
efforts made in hadoop 19. However, that way we might have to provide modes to 
run the queries differently depending on the version of hadoop.
- [[Anchor(Split_operator_(Phase_2))]]
- ==== Split operator (Phase 2) ====
- The goal is to make the split operator non-blocking. Instead of dumping a 
split immediately to disk we'll try to keep on going as long as possible. So 
the split operator would return the same record for as many times as there are 
children. This leaves you with multiple branches of the operator tree in the 
same map or reduce stage. These are going to be realized as nested plans inside 
- [[Anchor(Multiplex/Demultiplex_(Phase_3))]]
- ===== Multiplex/Demultiplex (Phase 3) =====
- The multiplex operation will serialize different nested plans of the split 
into the same output stream. If there are multiple distinct output streams it 
will add a key to distinguish between those. If all but one of the branches are 
terminated by a store, it will simple stream the single remaining one to the 
- Demuxing is done by the split operator followed by a special filter. The 
filter will only accept tuples for the particular pipeline.
- In phase 2, we'll simply terminate all but one split branch and store it into 
a tmp dir.
- [[Anchor(Combiner/Reducer_(Phase_3))]]
- ===== Combiner/Reducer (Phase 3) =====
- Multiplexing different branches into one stream allows us to run a combiner 
on the result - reducing the amount of data that will written. Otherwise we 
would have to dump everything in a map only job and then start one or more MR 
jobs to pick up the pieces.
- The current plan is to split the byte used for the join key between splits 
and joins. That leaves a nibble for each and reduces the number of joins and 
splits to 16.
- [[Anchor(MRCompiler_(Phase_2_and_3))]]
- ===== MRCompiler (Phase 2 and 3) =====
- The MR Compiler right now looks for splits, terminates the MR job at that 
point and connects the remaining operators via load and store.
- We'll add a new optimizer pass to look for these split scenarios. This gives 
us the ability to use the combiner plan information to make the determination 
of multiplexing or not (Phase 3) and also allows us more easily to switch back 
to the old style handling, if multiple outputs are not available.
- [[Anchor(Parallelism_(Phase_3))]]
- ===== Parallelism (Phase 3) =====
- If we multiplex outputs from different split branches we have to decide what 
to do with the requested parallelism: Max, sum or average?

Reply via email to