Dear Wiki user,
You have subscribed to a wiki page or wiki category on "Pig Wiki" for change
The following page has been changed by GuntherHagleitner:
= Multi-query Performance =
Currently scripts with multiple store commands can result in a lot of
duplicated work. The idea how to avoid the duplication is described here:
== External ==
=== Use cases: ===
==== Explicit/implicit split: ====
There might be cases in which you want to different processing on separate
parts of the same datastream. Like so:
A = load ...
split A' into B if ..., C if ...
store B' ...
store C' ...
B=filter A' ...
C=filter A' ...
store B' ...
store C' ...
In the current system the first example will dump A' to disk and then start
jobs for B' and C'. In the second example Pig will execute all the dependencies
of B' and store it. And then execute all the dependencies of C' and store it.
Both of the above are equivalent, but the performance will be different.
Here's what we plan to do to increase the performance:
* In the second case we will add an implicit split to transform the query to
case number one. That will eliminate the processing of A' multiple times.
* Make the split non-blocking and allow processing to continue. This will
help reduce the amount of data that has to be stored right at the split.
* Allow multiple outputs from a job. This way we can store some results as a
side-effect. This is also necessary to make the previous item work.
* Allow multiple split branches to be carried on to the combiner/reducer.
This will reduce the amount of IO again in the case where multiple branches in
the split can benefit from a combiner run.
==== Storing intermediate results ====
Sometimes people will store intermediate results.
If the script doesn't re-load A' for the processing of A'' the steps above A'
will be duplicated. This is basically a special case of Number 2 above, so the
same steps are recommended. With the proposed changes the script will basically
process A'' and dump A' as a side-effect. Which is what the user probably
wanted to begin with.
=== Why? ===
Pig's philosophy is: Optimize it yourself, why don't you.
* Implicit splits: It's probably what you expect when you use the same
handle in different stores.
* Store/Load vs Split: When optimizing, it's a reasonable assumption that
splits are faster than load/store combinations
* Side-effects: There is no way right now to make use of this
=== Changes ===
==== Execution in batch mode ====
Batch mode is entered when Pig is given a script to execute. Interactive mode
is on the grunt shell ("grunt:>"). Right now there isn't much difference
between them. In order for us to optimize the multi-query case, we'll need to
distinguish the two more.
Right now whenever the parser sees a store (or dump, explain, illustrate or
describe) it will kick of the execution of that part of the script. Part of
this proposal is that in batch mode, we parse the entire script first and see
if we can combine things to reduce the overall amount of work that needs to be
done. Only after that will the execution start.
The following changes are proposed (in batch):
* Store will not trigger an immediate execution. The entire script is
considered before the execution starts.
* Explicit splits will be put in places where a handle has multiple
children. If the user wants to explicitly force re-computation of common
ancestors she has to provide multiple scripts.
* Multiple split branches/stores in the script will be combined into the
same job, if possible. Again, using multiple scripts is the way to go to avoid
this (if that is desired).
For diagnostic operators there are some problems with this:
* They work on handles, which only gives you a slice of the entire script
execution at a time. What's more, is that at the point they may occur in a
script they might not give you an accurate picture about the situation, since
the execution plans might change once the entire script is handled.
* They change the logical tree. This means that we need to clone the tree
before we run them - something that we want to avoid in batch execution.
The proposal therefore is:
* Have Pig in batch mode ignore explain, dump, illustrate and describe.
* Add a load command to the shell to execute a script in interactive mode.
* Add scripts as a target (in additions to handles) to some diagnostic
* Add dot as an output type to explain (a graphical explanation of the graph
will make multi-query explains more understandable.)
That means that while someone is developing a PIG script they can put any
diagnostic operator into the script and then go to the grunt shell and load the
script. The statement will be executed and give you some information about that
part of the script. When a script is loaded, the user will also be able to
refer to any handles defined in the script on the shell.
Finally, when the script is ready the user can run the same script in batch and
all the diagnostic operators are ignored.
==== Load ====
(See https://issues.apache.org/jira/browse/PIG-574 - this is basically the same
as requested there)
The new command has the format:
load <script name>
Which will run the script in interactive mode.
==== Explain ====
Changes to the command:
explain <script>||<handle> [using text||dot] [into <path>]
* Explain is not executed in batch mode.
* If explain is given a script, it will output the entire execution graph
(logical, physical, MR + moving result files)
* Text will give what we have today, dot will output a format that can be
passed to dot for graphical display.
* In Text mode, multiple output (split) will be broken out in sections.
* Default (no using clause): Text
* Will generate logical.[txt||dot], physical.[txt||dot], mapred.[txt||dot]
in the specified directory.
* Default (no path given): Stdout
==== Illustrate ====
Changes to the command:
illustrate <script>||<handle> [into <file>]
* Illustrate is not executed in batch mode.
* If illustrate is given a script, it will output the entire execution graph
(logical, physical, MR + moving result files)
* Will write the illustrate output into the specified file.
* Default: Stdout
== Phases ==
These are the identified steps to get the proposal implemented.
=== Phase 1 ===
Phase one is about getting some infrastructural things in place.
* Batch execution (instead of single store execution)
* Merge logical plans into single
* Updated explain/illustrate
* Change local and hadoop engine to execute multiple store plans (with
store-load per split).
At the end of phase one, we'll have implicit splits. But the MR plan will be
the same as if the user made the splits explicit.
=== Phase 2 ===
Phase two is about getting the MR Compiler to agree to this.
* Allow multiple stores in single job
* Merge multiple plans into the split operator
* Terminate all but one with stores
=== Phase 3 ===
Phase three is about using the combiner/reducer on multiple split branches in
the same job
* Merge Combiner/Reducer plans
* Put in logic to decide when to multiplex pipelines
== Internal Changes ==
==== Grunt parser (Phase 1) ====
The parser currently uses a bottom up approach. When it sees a store (dump,
explain), it goes bottom up and generates the plan that needs to happen for
this particular store. In order to optimize the multi-query example, we need,
however, a peek on the entire graph for a script (interactive mode can be
In order to do this we will change the batch mode of the parser to:
* Not execute the plan when we see a store (or dump, illustrate, describe,
explain - which will be ignored)
* Alter the already existing merge functionality to allow intersecting
graphs to be joined into a single logical plan.
* Wait until the entire script is parsed and merged before sending the plan
on to do validation, optimization, etc.
The new "load" command will simply feed all the lines of a script through the
==== Explain, Dump, Describe and Illustrate (Phase 1) ====
As described above the changes are:
* Ignore these operations in batch mode
* Add options to explain and illustrate to work on a script file as well as
* Add the ability to print plans as dot files and to write explain and
illustrate output to files.
There will be some work to nicely represent the graphs resulting from explain
in text form. Right now operators with multiple outputs will result in the
ancestor tree be duplicated for each output. It might be nicer to show the
ancestors once and mark the other places as copies of that one.
==== Local Execution engine (Phase 1) ====
We need to make the local execution engine understand multiple store plans too.
This might come for free or at least cheaply. The current local engine uses a
call to store() on each physical store node to trigger the execution of the
pipeline and write out the result. Split is realized as a blocking operator
that will process the entire input and hands out an iterator to the tuples.
This will give the right result, but re-processes all the dependencies once per
store. We might in a later phase want to align this better with the hadoop
engine and allow a non blocking split as well as a separating the storing of
the records with the pulling from the pipeline.
==== Implicit split insertion (Phase 1) ====
Implicit split insertion already exists as part of the optimizer. It translates
any non-split logical node with multiple outputs into a split - split output
combination. This is what we need to put the splits for the multi-query
optimization in place. Right now, however, the parser is set up in a way that
multiple stores will never end up in the same plan and thus the insertion
doesn't happen for this case.
In short: Once we change the parser to look at the entire graph for a script
instead of a store-by-store basis, we will get this for free. We might actually
have to add logic to suppress this behavior in cases where the split will be
slower than the dual processing.
==== Store/Multiple output (Phase 2) ====
If we put implicit splits in place and enhance splits to contain additional
operators or even multiplex the split output within the same map reduce job, at
some point either a map or a reduce job need to be able to produce multiple
outputs. Currently there is a single output collector that will store the
results in part-* files.
Here are some options:
===== hadoop 0.19 supports MultipleOutput =====
All the output will still be in the same directory, but the developer can give
name for different sets of output data. So, in our case we might name the
output "split1" and "split2" and the output would come out to be:
===== Side-Effect files =====
===== Store operator (Phase 2) =====
In the current system the store op is removed by JobControlCompiler and used to
set up the output directory, etc. It holds information about how and where to
store the files, but doesn't actually do anything in the pipeline. The logic
for storing records in the map-reduce case, is handled by the map only and map
reduce classes. The logic is simple: Whatever comes out of the pipeline will be
transformed into a key/value pair collected using the output collector.
Using multiple output it would make sense to let the store operator do the
actual collection. The job compiler would have the duty to configure multiple
output streams and then assign the right collectors to the store operator. The
actual mapper/reducers will still have the responsibility to run the pipeline
and check for errors but the storing is handled by the store operator. Anything
that is not stored and trickles out at the bottom of the pipeline goes into the
standard collector. After the job is run we will have to move the files to the
It seems to make more sense to use hadoop's multiple output functionality.
Trying to build the same functionality with side-effect files will duplicate
efforts made in hadoop 19. However, that way we might have to provide modes to
run the queries differently depending on the version of hadoop.
==== Split operator (Phase 2) ====
The goal is to make the split operator non-blocking. Instead of dumping a split
immediately to disk we'll try to keep on going as long as possible. So the
split operator would return the same record for as many times as there are
children. This leaves you with multiple branches of the operator tree in the
same map or reduce stage. These are going to be realized as nested plans inside
===== Multiplex/Demultiplex (Phase 3) =====
The multiplex operation will serialize different nested plans of the split into
the same output stream. If there are multiple distinct output streams it will
add a key to distinguish between those. If all but one of the branches are
terminated by a store, it will simple stream the single remaining one to the
Demuxing is done by the split operator followed by a special filter. The filter
will only accept tuples for the particular pipeline.
In phase 2, we'll simply terminate all but one split branch and store it into a
===== Combiner/Reducer (Phase 3) =====
Multiplexing different branches into one stream allows us to run a combiner on
the result - reducing the amount of data that will written. Otherwise we would
have to dump everything in a map only job and then start one or more MR jobs to
pick up the pieces.
The current plan is to split the byte used for the join key between splits and
joins. That leaves a nibble for each and reduces the number of joins and splits
===== MRCompiler (Phase 2 and 3) =====
The MR Compiler right now looks for splits, terminates the MR job at that point
and connects the remaining operators via load and store.
We'll add a new optimizer pass to look for these split scenarios. This gives us
the ability to use the combiner plan information to make the determination of
multipexing or not (Phase 3) and also allows us more easily to switch back to
the old style handling, if multiple outputs are not available.
===== Parallelism (Phase 3) =====
If we multiplex outputs from different split branches we have to decide what to
do with the requested parallelism: Max, sum or average?
==== Diamond problem (Phase 3) ====
What happens when different split plans come back together?
Should come for free. Need to make sure unions can handle multiple split