Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The following page has been changed by GuntherHagleitner:
http://wiki.apache.org/pig/PigMultiQueryPerformanceSpecification

New page:
[[Anchor(Multi-query_Performance)]]
= Multi-query Performance =

Currently scripts with multiple store commands can result in a lot of 
duplicated work. The idea how to avoid the duplication is described here: 
https://issues.apache.org/jira/browse/PIG-627

[[Anchor(External)]]
== External ==

[[Anchor(Use_cases:)]]
=== Use cases: ===

[[Anchor(Explicit/implicit_split:)]]
==== Explicit/implicit split: ====

There might be cases in which you want to different processing on separate 
parts of the same datastream. Like so:

{{{
A = load ...
...
split A' into B if ..., C if ...
...
store B' ...
store C' ...
}}}

or

{{{
A=load ...
...
B=filter A' ...
C=filter A' ...
...
store B' ...
store C' ...
}}}

In the current system the first example will dump A' to disk and then start 
jobs for B' and C'. In the second example Pig will execute all the dependencies 
of B' and store it. And then execute all the dependencies of C' and store it.

Both of the above are equivalent, but the performance will be different. 

Here's what we plan to do to increase the performance:

   * In the second case we will add an implicit split to transform the query to 
case number one. That will eliminate the processing of A' multiple times.
   * Make the split non-blocking and allow processing to continue. This will 
help reduce the amount of data that has to be stored right at the split.
   * Allow multiple outputs from a job. This way we can store some results as a 
side-effect. This is also necessary to make the previous item work.
   * Allow multiple split branches to be carried on to the combiner/reducer. 
This will reduce the amount of IO again in the case where multiple branches in 
the split can benefit from a combiner run.

[[Anchor(Storing_intermediate_results)]]
==== Storing intermediate results ====

Sometimes people will store intermediate results.

{{{
A=load ...
...
store A'
...
store A''
}}}

If the script doesn't re-load A' for the processing of A'' the steps above A' 
will be duplicated. This is basically a special case of Number 2 above, so the 
same steps are recommended. With the proposed changes the script will basically 
process A'' and dump A' as a side-effect. Which is what the user probably 
wanted to begin with.

[[Anchor(Why?)]]
=== Why? ===

Pig's philosophy is: Optimize it yourself, why don't you.

However:

   * Implicit splits: It's probably what you expect when you use the same 
handle in different stores.
   * Store/Load vs Split: When optimizing, it's a reasonable assumption that 
splits are faster than load/store combinations
   * Side-effects: There is no way right now to make use of this

[[Anchor(Changes)]]
=== Changes ===

[[Anchor(Execution_in_batch_mode)]]
==== Execution in batch mode ====

Batch mode is entered when Pig is given a script to execute. Interactive mode 
is on the grunt shell ("grunt:>"). Right now there isn't much difference 
between them. In order for us to optimize the multi-query case, we'll need to 
distinguish the two more. 

Right now whenever the parser sees a store (or dump, explain, illustrate or 
describe) it will kick of the execution of that part of the script. Part of 
this proposal is that in batch mode, we parse the entire script first and see 
if we can combine things to reduce the overall amount of work that needs to be 
done. Only after that will the execution start. 

The following changes are proposed (in batch):

   * Store will not trigger an immediate execution. The entire script is 
considered before the execution starts.
   * Explicit splits will be put in places where a handle has multiple 
children. If the user wants to explicitly force re-computation of common 
ancestors she has to provide multiple scripts.
   * Multiple split branches/stores in the script will be combined into the 
same job, if possible. Again, using multiple scripts is the way to go to avoid 
this (if that is desired).

For diagnostic operators there are some problems with this:

   * They work on handles, which only gives you a slice of the entire script 
execution at a time. What's more, is that at the point they may occur in a 
script they might not give you an accurate picture about the situation, since 
the execution plans might change once the entire script is handled.
   * They change the logical tree. This means that we need to clone the tree 
before we run them - something that we want to avoid in batch execution.

The proposal therefore is:

   * Have Pig in batch mode ignore explain, dump, illustrate and describe.
   * Add a load command to the shell to execute a script in interactive mode.
   * Add scripts as a target (in additions to handles) to some diagnostic 
parameters.
   * Add dot as an output type to explain (a graphical explanation of the graph 
will make multi-query explains more understandable.)

That means that while someone is developing a PIG script they can put any 
diagnostic operator into the script and then go to the grunt shell and load the 
script. The statement will be executed and give you some information about that 
part of the script. When a script is loaded, the user will also be able to 
refer to any handles defined in the script on the shell. 

Finally, when the script is ready the user can run the same script in batch and 
all the diagnostic operators are ignored.

[[Anchor(Load)]]
==== Load ====

(See https://issues.apache.org/jira/browse/PIG-574 - this is basically the same 
as requested there)

The new command has the format:

{{{
load <script name>
}}}

Which will run the script in interactive mode.

[[Anchor(Explain)]]
==== Explain ====

Changes to the command:

{{{
explain <script>||<handle> [using text||dot] [into <path>]
}}}

Behavior:

   * Explain is not executed in batch mode.
   * If explain is given a script, it will output the entire execution graph 
(logical, physical, MR + moving result files)

Text/Dot:

   * Text will give what we have today, dot will output a format that can be 
passed to dot for graphical display.
   * In Text mode, multiple output (split) will be broken out in sections.
   * Default (no using clause): Text

Path:

   * Will generate logical.[txt||dot], physical.[txt||dot], mapred.[txt||dot] 
in the specified directory.
   * Default (no path given): Stdout

[[Anchor(Illustrate)]]
==== Illustrate ====

Changes to the command:

{{{
illustrate <script>||<handle> [into <file>]
}}}

Behavior:

   * Illustrate is not executed in batch mode.
   * If illustrate is given a script, it will output the entire execution graph 
(logical, physical, MR + moving result files)

File:

   * Will write the illustrate output into the specified file.
   * Default: Stdout

[[Anchor(Phases)]]
== Phases ==

These are the identified steps to get the proposal implemented.

[[Anchor(Phase_1)]]
=== Phase 1 ===
Phase one is about getting some infrastructural things in place. 

   * Batch execution (instead of single store execution)
   * Merge logical plans into single
   * Updated explain/illustrate
   * Change local and hadoop engine to execute multiple store plans (with 
store-load per split).

At the end of phase one, we'll have implicit splits. But the MR plan will be 
the same as if the user made the splits explicit.

[[Anchor(Phase_2)]]
=== Phase 2 ===
Phase two is about getting the MR Compiler to agree to this.

   * Allow multiple stores in single job
   * Merge multiple plans into the split operator
   * Terminate all but one with stores

[[Anchor(Phase_3)]]
=== Phase 3 ===
Phase three is about using the combiner/reducer on multiple split branches in 
the same job

   * Merge Combiner/Reducer plans
   * Put in logic to decide when to multiplex pipelines

[[Anchor(Internal)]]
== Internal Changes ==

[[Anchor(Grunt_parser_(Phase_1))]]
==== Grunt parser (Phase 1) ====
The parser currently uses a bottom up approach. When it sees a store (dump, 
explain), it goes bottom up and generates the plan that needs to happen for 
this particular store. In order to optimize the multi-query example, we need, 
however, a peek on the entire graph for a script (interactive mode can be 
handled differently).

In order to do this we will change the batch mode of the parser to:

   * Not execute the plan when we see a store (or dump, illustrate, describe, 
explain - which will be ignored)
   * Alter the already existing merge functionality to allow intersecting 
graphs to be joined into a single logical plan.
   * Wait until the entire script is parsed and merged before sending the plan 
on to do validation, optimization, etc.

The new "load" command will simply feed all the lines of a script through the 
interactive mode.

[[Anchor(Explain,_Dump,_and_Illustrate_(Phase_1))]]
==== Explain, Dump, Describe and Illustrate (Phase 1) ====

As described above the changes are:

   * Ignore these operations in batch mode
   * Add options to explain and illustrate to work on a script file as well as 
a handle.
   * Add the ability to print plans as dot files and to write explain and 
illustrate output to files.

There will be some work to nicely represent the graphs resulting from explain 
in text form. Right now operators with multiple outputs will result in the 
ancestor tree be duplicated for each output. It might be nicer to show the 
ancestors once and mark the other places as copies of that one.

[[Anchor(Local_Execution_engine_(Phase_1))]]
==== Local Execution engine (Phase 1) ====
We need to make the local execution engine understand multiple store plans too. 
This might come for free or at least cheaply. The current local engine uses a 
call to store() on each physical store node to trigger the execution of the 
pipeline and write out the result. Split is realized as a blocking operator 
that will process the entire input and hands out an iterator to the tuples.

This will give the right result, but re-processes all the dependencies once per 
store. We might in a later phase want to align this better with the hadoop 
engine and allow a non blocking split as well as a separating the storing of 
the records with the pulling from the pipeline.

[[Anchor(Implicit_split_insertion_(Phase_1))]]
==== Implicit split insertion (Phase 1) ====
Implicit split insertion already exists as part of the optimizer. It translates 
any non-split logical node with multiple outputs into a split - split output 
combination. This is what we need to put the splits for the multi-query 
optimization in place. Right now, however, the parser is set up in a way that 
multiple stores will never end up in the same plan and thus the insertion 
doesn't happen for this case. 

In short: Once we change the parser to look at the entire graph for a script 
instead of a store-by-store basis, we will get this for free. We might actually 
have to add logic to suppress this behavior in cases where the split will be 
slower than the dual processing.

[[Anchor(Store/Multiple_output_(Phase_2))]]
==== Store/Multiple output (Phase 2) ====
If we put implicit splits in place and enhance splits to contain additional 
operators or even multiplex the split output within the same map reduce job, at 
some point either a map or a reduce job need to be able to produce multiple 
outputs. Currently there is a single output collector that will store the 
results in part-* files.

Here are some options:
[[Anchor(hadoop_0.19_supports_MultipleOutput)]]
===== hadoop 0.19 supports MultipleOutput =====
Link: 
http://hadoop.apache.org/core/docs/r0.19.0/api/org/apache/hadoop/mapred/lib/MultipleOutputs.html#addNamedOutput(org.apache.hadoop.mapred.JobConf,%20java.lang.String,%20java.lang.Class,%20java.lang.Class,%20java.lang.Class)

All the output will still be in the same directory, but the developer can give 
name for different sets of output data. So, in our case we might name the 
output "split1" and "split2" and the output would come out to be:

{{{
/outdir/split1-0000
/outdir/split1-0001
/outdir/split1-0002
/outdir/split2-0000
}}}

[[Anchor(Side-Effect_files)]]
===== Side-Effect files =====
Link: 
http://hadoop.apache.org/core/docs/current/mapred_tutorial.html#Task+Side-Effect+Files

[[Anchor(Store_operator_(Phase_2))]]
===== Store operator (Phase 2) =====
In the current system the store op is removed by JobControlCompiler and used to 
set up the output directory, etc. It holds information about how and where to 
store the files, but doesn't actually do anything in the pipeline. The logic 
for storing records in the map-reduce case, is handled by the map only and map 
reduce classes. The logic is simple: Whatever comes out of the pipeline will be 
transformed into a key/value pair collected using the output collector. 

Using multiple output it would make sense to let the store operator do the 
actual collection. The job compiler would have the duty to configure multiple 
output streams and then assign the right collectors to the store operator. The 
actual mapper/reducers will still have the responsibility to run the pipeline 
and check for errors but the storing is handled by the store operator. Anything 
that is not stored and trickles out at the bottom of the pipeline goes into the 
standard collector. After the job is run we will have to move the files to the 
right directories.

It seems to make more sense to use hadoop's multiple output functionality. 
Trying to build the same functionality with side-effect files will duplicate 
efforts made in hadoop 19. However, that way we might have to provide modes to 
run the queries differently depending on the version of hadoop.

[[Anchor(Split_operator_(Phase_2))]]
==== Split operator (Phase 2) ====

The goal is to make the split operator non-blocking. Instead of dumping a split 
immediately to disk we'll try to keep on going as long as possible. So the 
split operator would return the same record for as many times as there are 
children. This leaves you with multiple branches of the operator tree in the 
same map or reduce stage. These are going to be realized as nested plans inside 
split.

[[Anchor(Multiplex/Demultiplex_(Phase_3))]]
===== Multiplex/Demultiplex (Phase 3) =====

The multiplex operation will serialize different nested plans of the split into 
the same output stream. If there are multiple distinct output streams it will 
add a key to distinguish between those. If all but one of the branches are 
terminated by a store, it will simple stream the single remaining one to the 
output.

Demuxing is done by the split operator followed by a special filter. The filter 
will only accept tuples for the particular pipeline.

In phase 2, we'll simply terminate all but one split branch and store it into a 
tmp dir.

[[Anchor(Combiner/Reducer_(Phase_3))]]
===== Combiner/Reducer (Phase 3) =====

Multiplexing different branches into one stream allows us to run a combiner on 
the result - reducing the amount of data that will written. Otherwise we would 
have to dump everything in a map only job and then start one or more MR jobs to 
pick up the pieces.

The current plan is to split the byte used for the join key between splits and 
joins. That leaves a nibble for each and reduces the number of joins and splits 
to 16.

[[Anchor(MRCompiler_(Phase_2_and_3))]]
===== MRCompiler (Phase 2 and 3) =====
The MR Compiler right now looks for splits, terminates the MR job at that point 
and connects the remaining operators via load and store.

We'll add a new optimizer pass to look for these split scenarios. This gives us 
the ability to use the combiner plan information to make the determination of 
multipexing or not (Phase 3) and also allows us more easily to switch back to 
the old style handling, if multiple outputs are not available.

[[Anchor(Parallelism_(Phase_3))]]
===== Parallelism (Phase 3) =====

If we multiplex outputs from different split branches we have to decide what to 
do with the requested parallelism: Max, sum or average?

[[Anchor(Diamond_problem_(Phase_3))]]
==== Diamond problem (Phase 3) ====
What happens when different split plans come back together?

Should come for free. Need to make sure unions can handle multiple split 
branches.

Reply via email to