Dear Wiki user,
You have subscribed to a wiki page or wiki category on "Pig Wiki" for change
The following page has been changed by GuntherHagleitner:
Will result in a job that produces bar, then the mv and rm are executed.
Finally, another job
is run that will generate foo.
- == Phases ==
- These are the identified steps to get the proposal implemented.
- === Phase 1 ===
- Phase one is about getting some infrastructural things in place.
- * Batch execution (instead of single store execution)
- * Merge logical plans into single
- * Updated explain/illustrate
- * Change local and hadoop engine to execute multiple store plans (with
store-load per split).
- At the end of phase one, we'll have implicit splits. But the MR plan will be
the same as if the user made the splits explicit.
- === Phase 2 ===
- Phase two is about getting the MR Compiler to agree to this.
- * Allow multiple stores in single job
- * Merge multiple plans into the split operator
- * Terminate all but one with stores
- === Phase 3 ===
- Phase three is about using the combiner/reducer on multiple split branches in
the same job
- * Merge Combiner/Reducer plans
- * Put in logic to decide when to multiplex pipelines
== Internal Changes ==
- ==== GruntParser/PigServer (Phase 1) ====
+ ==== GruntParser/PigServer ====
The parser currently uses a bottom up approach. When it sees a store (dump,
explain), it goes bottom up and generates the plan that needs to happen for
this particular store. In order to optimize the multi-query example, we need,
however, a peek on the entire graph for a script (interactive mode is handled
- The highlevel changes to the parser/server are:
+ The high-level changes to the parser/server are:
* Not execute the plan when we see a store or dump
* Alter the already existing merge functionality to allow intersecting
graphs to be joined into a single logical plan.
@@ -539, +508 @@
If the multi-query optimization is turned off all graphs will be generated as
interactive, which is how we revert the behavior.
- The merging of the different logical plans is done in the OperatorPlan, where
merges can now either check for disjoint graphs or merge them with overlaps.
+ The merging of the different logical plans is done in the OperatorPlan, where
merges can now either check for disjoint graphs or merge them with overlaps.
Merged plans are later passed on to the implicit split inserter at which point
all the overlapping operators from the merge will result in an explicit split.
Finally, the store-load handling is done in the pig server. It will either
transform the plan or add a store-load connection. Absolute filenames will be
available, since the QueryParser now translates them when it sees them.
The grunt parser makes use of the new PigServer APIs. It will use the batch
API to parse and execute scripts. Since scripts can be nested, the script
execution and the stack of graphs in the PigServer are closely related.
- ==== Explain (Phase 1) ====
+ ==== Explain ====
As described above the changes are:
@@ -564, +533 @@
CoGroups are special in that their nested plans are connected to input
operators. This is modeled as subgraphs as well and the subgraphs are connected
to the respective input operator.
+ [[Anchor(Map Reduce Engine)]]
+ ==== Map Reduce Engine ====
+ Let's first look at how a multi-query is processed in the hadoop case. We
have already discussed how the
+ logical plan is generated, driven by the PigServer and GruntParser. The next
step is translation of the logical plan
+ to the physical plan. This has not changed except that in the multi-query
case it has to carry connections of store
+ to load operator through to the physical plan.
+ ===== MRCompiler =====
+ After that the MRCompiler takes the physical plan and breaks it into a plan
of map-reduce operators. The handling of
+ split operators has not changed. What happens is:
+ When a split is encountered, it is removed from the plan and replaced by a
store to a temporary path with BinStorage.
+ The split branches are placed in new map-reduce operators that load from the
+ Some minor changes to the MRCompiler are:
+ * Edges from store to load operator are removed and their respective
map-reduce operators are connected. That will enforce that the store job runs
before the load job.
+ * Jobs that result from a split are marked as splitters and splittees, so
that in the optimization we can look at those.
+ ===== MultiQueryOptimizer =====
+ After the compiler the optimizers are run. The last one in the chain is the
MultiQueryOptimizer. This optimizer is looking at all splitters and splittees
and will merge them recursively - producing what was described above in
ExecutionPlans. Since this step is run after the combiner optimizer, it can
also go ahead and merge combiners if necessary. If the optimizer cannot merge
it will keep the existing store that the split produced in the MRCompiler.
+ The merging of splittees into a splitter consists of:
+ * Creating a split operator in the map or reduce and setting the splittee
plans as nested plans of the split
+ * If it needs to merge combiners it will introduce a Demux operator to
route the input from mixed split branches in the mapper to the right combine
plan. The separate combiner plans are the nested plans of the Demux operator
+ * If a map reduce operator does not have a combiner it will insert a
FakeLocalRearrange operator to simply route the input through.
+ * If it needs to merge reduce plans, it will do so using the Demux
operator the same way the combiner is merged.
+ Note: As an end result this merging will result in Split or Demux operators
with multiple stores tucked away in their nested plans.
+ There are two more minor optimizers:
+ * NoopFilterRemover will remove constant-true filters that splits produce
+ * NoopStoreRemover will remove implicit stores to temporary files, if
there is a user initiated store that contains the same information
+ ===== MapReduceLauncher =====
+ The map reduce launcher receives the map-reduce operator plan from the
compilation stage. It uses the JobControlCompiler to compile map-reduce
operators into hadoop job control objects and submits them to hadoop for
+ The biggest change stems from the fact that with the current hadoop system we
need to store multiple output from a single job in a temp directory first and
then have to move it to the real location. Map-reduce operators that are
connected in the graph mean that the successor needs to read files that the
parent produces. In order to do that we need to submit the graph in several
steps and after each one we need to move the files to their final location. So
the overall job submission works like this:
+ * Remove all root operators
+ * Compile these operators into jobs
+ * Submit jobs to hadoop
+ * Move result files or collect failure
+ * Repeat unless map-reduce plan is empty
+ ===== JobControlCompiler =====
+ The JobControlCompiler distinguishes between multi store and single store
map-reduce operators. Single store operators are processed the same way it was
done before: The store is removed and the job configuration is set up to put
the records that come out of the pipeline in the right files.
+ Multi store plans are handled differently. The compiler sets the output path
to a newly created temporary directory. It also leaves the store operators in
the plan; These operators will at -execution time- create subdirectories of the
temporary path and direct records to that directory.
+ Let's assume we have 2 stores in the plan to directories "foo" and
"/out/bar". After running the job one gets the following temporary directory
+ The JobControlCompiler will then as a final step move the result files to
their final locations.
+ [[Anchor(Store Operator)]]
+ ===== Store Operator =====
+ Finally, after the MapReduceLauncher has submitted the jobs execution starts
on the backend.
+ The store operator changed to deal with multiple outputs from a single
map-reduce job as well as enabling the local engine to handle multiple stores
in the same plan.
+ The operator itself has a store implementation that is instantiated by the
execution engine that handles the output. Store has a "getNext" function that
stores the input record and returns a null record. That basically means that
stores can be placed into an execution pipeline as an operator with a
side-effect that will consume all input records.
+ The local engine's implementation of the store is straight forward. It
creates the StoreFunc and binds the output stream to it.
+ The map reduce engine's implementation of the store needs to set up a few
more things. It creates a copy of the job configuration and overwrites some key
fields. It changes the output and temporary work output directory to point to a
temporary path specific to the store being processed (see JobControlCompiler).
It also sets PigOutputFormat as the output class as well as registering the
actual store function that PigOutputFormat is going to use. Finally it creates
a new record writer and output collector with these settings.
+ ===== Split operator =====
+ The split operator can now be used at execution time too. It used to always
be removed by the compiler (and replaced by store-load combinations). So,
here's how the split operator works:
+ * Every input record will be attached to each nested plan in sequence
+ * Until a plan is exhausted it will keep returning records from that plan
+ * Once EOP is returned the next plan will be pulled from.
+ * If all plans have been dealt with already the next input record is
+ * If there are no more input records the EOP is passed on to the requester
+ ===== Demux Operator =====
+ The demux operator is used in combiners and reducers where the input is a mix
of different split plans of the mapper. It will decide which of it's nested
plans a record belongs to and then attach it to that particular plan.
- ==== Local Execution engine (Phase 1) ====
+ ==== Local Execution Engine ====
- We need to make the local execution engine understand multiple store plans
too. This might come for free or at least cheaply. The current local engine
uses a call to store() on each physical store node to trigger the execution of
the pipeline and write out the result. Split is realized as a blocking operator
that will process the entire input and hands out an iterator to the tuples.
+ The local engine has not changed as much as the map reduce engine. The local
engine executes the physical plan directly. The main changes were:
- This will give the right result, but re-processes all the dependencies once
per store. We might in a later phase want to align this better with the hadoop
engine and allow a non blocking split as well as a separating the storing of
the records with the pulling from the pipeline.
+ * Allow for multiple stores in the physical plan
+ * Respect order of execution when there are edges between store and load
+ The local engine will thus execute the physical plan as follows:
- ==== Implicit split insertion (Phase 1) ====
- Implicit split insertion already exists as part of the optimizer. It
translates any non-split logical node with multiple outputs into a split -
split output combination. This is what we need to put the splits for the
multi-query optimization in place. Right now, however, the parser is set up in
a way that multiple stores will never end up in the same plan and thus the
insertion doesn't happen for this case.
- In short: Once we change the parser to look at the entire graph for a script
instead of a store-by-store basis, we will get this for free. We might actually
have to add logic to suppress this behavior in cases where the split will be
slower than the dual processing.
+ * Get all the store operators with dependent load operators in a
+ * Execute these stores one by one
+ * Get all remaining stores and execute them in at once
- ==== Store/Multiple output (Phase 2) ====
- If we put implicit splits in place and enhance splits to contain additional
operators or even multiplex the split output within the same map reduce job, at
some point either a map or a reduce job need to be able to produce multiple
outputs. Currently there is a single output collector that will store the
results in part-* files.
- Here are some options:
- ===== hadoop 0.19 supports MultipleOutput =====
- All the output will still be in the same directory, but the developer can
give names for different sets of output data. So, in our case we might name the
output "split1" and "split2" and the output would come out to be:
- ===== Side-Effect files =====
- ===== Store operator (Phase 2) =====
- In the current system the store op is removed by JobControlCompiler and used
to set up the output directory, etc. It holds information about how and where
to store the files, but doesn't actually do anything in the pipeline. The logic
for storing records in the map-reduce case, is handled by the map only and map
reduce classes. The logic is simple: Whatever comes out of the pipeline will be
transformed into a key/value pair collected using the output collector.
- Using multiple output it would make sense to let the store operator do the
actual collection. The job compiler would have the duty to configure multiple
output streams and then assign the right collectors to the store operator. The
actual mapper/reducers will still have the responsibility to run the pipeline
and check for errors but the storing is handled by the store operator. Anything
that is not stored and trickles out at the bottom of the pipeline goes into the
standard collector. After the job is run we will have to move the files to the
- It seems to make more sense to use hadoop's multiple output functionality.
Trying to build the same functionality with side-effect files will duplicate
efforts made in hadoop 19. However, that way we might have to provide modes to
run the queries differently depending on the version of hadoop.
- ==== Split operator (Phase 2) ====
- The goal is to make the split operator non-blocking. Instead of dumping a
split immediately to disk we'll try to keep on going as long as possible. So
the split operator would return the same record for as many times as there are
children. This leaves you with multiple branches of the operator tree in the
same map or reduce stage. These are going to be realized as nested plans inside
- ===== Multiplex/Demultiplex (Phase 3) =====
- The multiplex operation will serialize different nested plans of the split
into the same output stream. If there are multiple distinct output streams it
will add a key to distinguish between those. If all but one of the branches are
terminated by a store, it will simple stream the single remaining one to the
- Demuxing is done by the split operator followed by a special filter. The
filter will only accept tuples for the particular pipeline.
- In phase 2, we'll simply terminate all but one split branch and store it into
a tmp dir.
- ===== Combiner/Reducer (Phase 3) =====
- Multiplexing different branches into one stream allows us to run a combiner
on the result - reducing the amount of data that will written. Otherwise we
would have to dump everything in a map only job and then start one or more MR
jobs to pick up the pieces.
- The current plan is to split the byte used for the join key between splits
and joins. That leaves a nibble for each and reduces the number of joins and
splits to 16.
- ===== MRCompiler (Phase 2 and 3) =====
- The MR Compiler right now looks for splits, terminates the MR job at that
point and connects the remaining operators via load and store.
- We'll add a new optimizer pass to look for these split scenarios. This gives
us the ability to use the combiner plan information to make the determination
of multiplexing or not (Phase 3) and also allows us more easily to switch back
to the old style handling, if multiple outputs are not available.
- ===== Parallelism (Phase 3) =====
- If we multiplex outputs from different split branches we have to decide what
to do with the requested parallelism: Max, sum or average?