Dear Wiki user, You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.
The following page has been changed by GuntherHagleitner: http://wiki.apache.org/pig/PigMultiQueryPerformanceSpecification ------------------------------------------------------------------------------ Will result in a job that produces bar, then the mv and rm are executed. Finally, another job is run that will generate foo. - [[Anchor(Phases)]] - == Phases == - - These are the identified steps to get the proposal implemented. - - [[Anchor(Phase_1)]] - === Phase 1 === - Phase one is about getting some infrastructural things in place. - - * Batch execution (instead of single store execution) - * Merge logical plans into single - * Updated explain/illustrate - * Change local and hadoop engine to execute multiple store plans (with store-load per split). - - At the end of phase one, we'll have implicit splits. But the MR plan will be the same as if the user made the splits explicit. - - [[Anchor(Phase_2)]] - === Phase 2 === - Phase two is about getting the MR Compiler to agree to this. - - * Allow multiple stores in single job - * Merge multiple plans into the split operator - * Terminate all but one with stores - - [[Anchor(Phase_3)]] - === Phase 3 === - Phase three is about using the combiner/reducer on multiple split branches in the same job - - * Merge Combiner/Reducer plans - * Put in logic to decide when to multiplex pipelines - [[Anchor(Internal)]] == Internal Changes == - [[Anchor(Grunt_parser_(Phase_1))]] + [[Anchor(Grunt_parser)]] - ==== GruntParser/PigServer (Phase 1) ==== + ==== GruntParser/PigServer ==== The parser currently uses a bottom up approach. When it sees a store (dump, explain), it goes bottom up and generates the plan that needs to happen for this particular store. In order to optimize the multi-query example, we need, however, a peek on the entire graph for a script (interactive mode is handled differently). - The highlevel changes to the parser/server are: + The high-level changes to the parser/server are: * Not execute the plan when we see a store or dump * Alter the already existing merge functionality to allow intersecting graphs to be joined into a single logical plan. @@ -539, +508 @@ If the multi-query optimization is turned off all graphs will be generated as interactive, which is how we revert the behavior. - The merging of the different logical plans is done in the OperatorPlan, where merges can now either check for disjoint graphs or merge them with overlaps. + The merging of the different logical plans is done in the OperatorPlan, where merges can now either check for disjoint graphs or merge them with overlaps. Merged plans are later passed on to the implicit split inserter at which point all the overlapping operators from the merge will result in an explicit split. Finally, the store-load handling is done in the pig server. It will either transform the plan or add a store-load connection. Absolute filenames will be available, since the QueryParser now translates them when it sees them. The grunt parser makes use of the new PigServer APIs. It will use the batch API to parse and execute scripts. Since scripts can be nested, the script execution and the stack of graphs in the PigServer are closely related. - [[Anchor(Explain_(Phase_1))]] + [[Anchor(Explain)]] - ==== Explain (Phase 1) ==== + ==== Explain ==== As described above the changes are: @@ -564, +533 @@ CoGroups are special in that their nested plans are connected to input operators. This is modeled as subgraphs as well and the subgraphs are connected to the respective input operator. + [[Anchor(Map Reduce Engine)]] + ==== Map Reduce Engine ==== + + Let's first look at how a multi-query is processed in the hadoop case. We have already discussed how the + logical plan is generated, driven by the PigServer and GruntParser. The next step is translation of the logical plan + to the physical plan. This has not changed except that in the multi-query case it has to carry connections of store + to load operator through to the physical plan. + + [[Anchor(MRCompiler)]] + ===== MRCompiler ===== + + After that the MRCompiler takes the physical plan and breaks it into a plan of map-reduce operators. The handling of + split operators has not changed. What happens is: + + When a split is encountered, it is removed from the plan and replaced by a store to a temporary path with BinStorage. + The split branches are placed in new map-reduce operators that load from the temporary path. + + Some minor changes to the MRCompiler are: + + * Edges from store to load operator are removed and their respective map-reduce operators are connected. That will enforce that the store job runs before the load job. + * Jobs that result from a split are marked as splitters and splittees, so that in the optimization we can look at those. + + [[Anchor(MultiQueryOptimizer)]] + ===== MultiQueryOptimizer ===== + + After the compiler the optimizers are run. The last one in the chain is the MultiQueryOptimizer. This optimizer is looking at all splitters and splittees and will merge them recursively - producing what was described above in ExecutionPlans. Since this step is run after the combiner optimizer, it can also go ahead and merge combiners if necessary. If the optimizer cannot merge it will keep the existing store that the split produced in the MRCompiler. + + The merging of splittees into a splitter consists of: + + * Creating a split operator in the map or reduce and setting the splittee plans as nested plans of the split + * If it needs to merge combiners it will introduce a Demux operator to route the input from mixed split branches in the mapper to the right combine plan. The separate combiner plans are the nested plans of the Demux operator + * If a map reduce operator does not have a combiner it will insert a FakeLocalRearrange operator to simply route the input through. + * If it needs to merge reduce plans, it will do so using the Demux operator the same way the combiner is merged. + + Note: As an end result this merging will result in Split or Demux operators with multiple stores tucked away in their nested plans. + + There are two more minor optimizers: + * NoopFilterRemover will remove constant-true filters that splits produce + * NoopStoreRemover will remove implicit stores to temporary files, if there is a user initiated store that contains the same information + + [[Anchor(MapReduceLauncher)]] + ===== MapReduceLauncher ===== + + The map reduce launcher receives the map-reduce operator plan from the compilation stage. It uses the JobControlCompiler to compile map-reduce operators into hadoop job control objects and submits them to hadoop for execution. + + The biggest change stems from the fact that with the current hadoop system we need to store multiple output from a single job in a temp directory first and then have to move it to the real location. Map-reduce operators that are connected in the graph mean that the successor needs to read files that the parent produces. In order to do that we need to submit the graph in several steps and after each one we need to move the files to their final location. So the overall job submission works like this: + + * Remove all root operators + * Compile these operators into jobs + * Submit jobs to hadoop + * Move result files or collect failure + * Repeat unless map-reduce plan is empty + + [[Anchor(JobControlCompiler)]] + ===== JobControlCompiler ===== + + The JobControlCompiler distinguishes between multi store and single store map-reduce operators. Single store operators are processed the same way it was done before: The store is removed and the job configuration is set up to put the records that come out of the pipeline in the right files. + + Multi store plans are handled differently. The compiler sets the output path to a newly created temporary directory. It also leaves the store operators in the plan; These operators will at -execution time- create subdirectories of the temporary path and direct records to that directory. + + Let's assume we have 2 stores in the plan to directories "foo" and "/out/bar". After running the job one gets the following temporary directory structure: + + {{{ + /tmp/temp-<number>/abs/out/bar/part-* + /tmp/temp-<number>/rel/foo/part-* + }}} + + The JobControlCompiler will then as a final step move the result files to their final locations. + + [[Anchor(Store Operator)]] + ===== Store Operator ===== + + Finally, after the MapReduceLauncher has submitted the jobs execution starts on the backend. + + The store operator changed to deal with multiple outputs from a single map-reduce job as well as enabling the local engine to handle multiple stores in the same plan. + + The operator itself has a store implementation that is instantiated by the execution engine that handles the output. Store has a "getNext" function that stores the input record and returns a null record. That basically means that stores can be placed into an execution pipeline as an operator with a side-effect that will consume all input records. + + The local engine's implementation of the store is straight forward. It creates the StoreFunc and binds the output stream to it. + + The map reduce engine's implementation of the store needs to set up a few more things. It creates a copy of the job configuration and overwrites some key fields. It changes the output and temporary work output directory to point to a temporary path specific to the store being processed (see JobControlCompiler). It also sets PigOutputFormat as the output class as well as registering the actual store function that PigOutputFormat is going to use. Finally it creates a new record writer and output collector with these settings. + + [[Anchor(Split_operator)]] + ===== Split operator ===== + + The split operator can now be used at execution time too. It used to always be removed by the compiler (and replaced by store-load combinations). So, here's how the split operator works: + + * Every input record will be attached to each nested plan in sequence + * Until a plan is exhausted it will keep returning records from that plan + * Once EOP is returned the next plan will be pulled from. + * If all plans have been dealt with already the next input record is fetched + * If there are no more input records the EOP is passed on to the requester + + [[Anchor(DemuxOperator)]] + ===== Demux Operator ===== + + The demux operator is used in combiners and reducers where the input is a mix of different split plans of the mapper. It will decide which of it's nested plans a record belongs to and then attach it to that particular plan. + - [[Anchor(Local_Execution_engine_(Phase_1))]] + [[Anchor(Local_Execution_engine)]] - ==== Local Execution engine (Phase 1) ==== + ==== Local Execution Engine ==== - We need to make the local execution engine understand multiple store plans too. This might come for free or at least cheaply. The current local engine uses a call to store() on each physical store node to trigger the execution of the pipeline and write out the result. Split is realized as a blocking operator that will process the entire input and hands out an iterator to the tuples. + The local engine has not changed as much as the map reduce engine. The local engine executes the physical plan directly. The main changes were: - This will give the right result, but re-processes all the dependencies once per store. We might in a later phase want to align this better with the hadoop engine and allow a non blocking split as well as a separating the storing of the records with the pulling from the pipeline. + * Allow for multiple stores in the physical plan + * Respect order of execution when there are edges between store and load operators + The local engine will thus execute the physical plan as follows: - [[Anchor(Implicit_split_insertion_(Phase_1))]] - ==== Implicit split insertion (Phase 1) ==== - Implicit split insertion already exists as part of the optimizer. It translates any non-split logical node with multiple outputs into a split - split output combination. This is what we need to put the splits for the multi-query optimization in place. Right now, however, the parser is set up in a way that multiple stores will never end up in the same plan and thus the insertion doesn't happen for this case. - In short: Once we change the parser to look at the entire graph for a script instead of a store-by-store basis, we will get this for free. We might actually have to add logic to suppress this behavior in cases where the split will be slower than the dual processing. + * Get all the store operators with dependent load operators in a dependency order + * Execute these stores one by one + * Get all remaining stores and execute them in at once - [[Anchor(Store/Multiple_output_(Phase_2))]] - ==== Store/Multiple output (Phase 2) ==== - If we put implicit splits in place and enhance splits to contain additional operators or even multiplex the split output within the same map reduce job, at some point either a map or a reduce job need to be able to produce multiple outputs. Currently there is a single output collector that will store the results in part-* files. - - Here are some options: - [[Anchor(hadoop_0.19_supports_MultipleOutput)]] - ===== hadoop 0.19 supports MultipleOutput ===== - Link: http://hadoop.apache.org/core/docs/r0.19.0/api/org/apache/hadoop/mapred/lib/MultipleOutputs.html#addNamedOutput(org.apache.hadoop.mapred.JobConf,%20java.lang.String,%20java.lang.Class,%20java.lang.Class,%20java.lang.Class) - - All the output will still be in the same directory, but the developer can give names for different sets of output data. So, in our case we might name the output "split1" and "split2" and the output would come out to be: - - {{{ - /outdir/split1-0000 - /outdir/split1-0001 - /outdir/split1-0002 - /outdir/split2-0000 - }}} - - [[Anchor(Side-Effect_files)]] - ===== Side-Effect files ===== - Link: http://hadoop.apache.org/core/docs/current/mapred_tutorial.html#Task+Side-Effect+Files - - [[Anchor(Store_operator_(Phase_2))]] - ===== Store operator (Phase 2) ===== - In the current system the store op is removed by JobControlCompiler and used to set up the output directory, etc. It holds information about how and where to store the files, but doesn't actually do anything in the pipeline. The logic for storing records in the map-reduce case, is handled by the map only and map reduce classes. The logic is simple: Whatever comes out of the pipeline will be transformed into a key/value pair collected using the output collector. - - Using multiple output it would make sense to let the store operator do the actual collection. The job compiler would have the duty to configure multiple output streams and then assign the right collectors to the store operator. The actual mapper/reducers will still have the responsibility to run the pipeline and check for errors but the storing is handled by the store operator. Anything that is not stored and trickles out at the bottom of the pipeline goes into the standard collector. After the job is run we will have to move the files to the right directories. - - It seems to make more sense to use hadoop's multiple output functionality. Trying to build the same functionality with side-effect files will duplicate efforts made in hadoop 19. However, that way we might have to provide modes to run the queries differently depending on the version of hadoop. - - [[Anchor(Split_operator_(Phase_2))]] - ==== Split operator (Phase 2) ==== - - The goal is to make the split operator non-blocking. Instead of dumping a split immediately to disk we'll try to keep on going as long as possible. So the split operator would return the same record for as many times as there are children. This leaves you with multiple branches of the operator tree in the same map or reduce stage. These are going to be realized as nested plans inside split. - - [[Anchor(Multiplex/Demultiplex_(Phase_3))]] - ===== Multiplex/Demultiplex (Phase 3) ===== - - The multiplex operation will serialize different nested plans of the split into the same output stream. If there are multiple distinct output streams it will add a key to distinguish between those. If all but one of the branches are terminated by a store, it will simple stream the single remaining one to the output. - - Demuxing is done by the split operator followed by a special filter. The filter will only accept tuples for the particular pipeline. - - In phase 2, we'll simply terminate all but one split branch and store it into a tmp dir. - - [[Anchor(Combiner/Reducer_(Phase_3))]] - ===== Combiner/Reducer (Phase 3) ===== - - Multiplexing different branches into one stream allows us to run a combiner on the result - reducing the amount of data that will written. Otherwise we would have to dump everything in a map only job and then start one or more MR jobs to pick up the pieces. - - The current plan is to split the byte used for the join key between splits and joins. That leaves a nibble for each and reduces the number of joins and splits to 16. - - [[Anchor(MRCompiler_(Phase_2_and_3))]] - ===== MRCompiler (Phase 2 and 3) ===== - The MR Compiler right now looks for splits, terminates the MR job at that point and connects the remaining operators via load and store. - - We'll add a new optimizer pass to look for these split scenarios. This gives us the ability to use the combiner plan information to make the determination of multiplexing or not (Phase 3) and also allows us more easily to switch back to the old style handling, if multiple outputs are not available. - - [[Anchor(Parallelism_(Phase_3))]] - ===== Parallelism (Phase 3) ===== - - If we multiplex outputs from different split branches we have to decide what to do with the requested parallelism: Max, sum or average? -