New attachment added to page PigAccumulatorSpec/homes/yinghe/De sktop on Pig Wiki
Dear Wiki user, You have subscribed to a wiki page PigAccumulatorSpec/homes/yinghe/Desktop for change notification. An attachment has been added to that page by yinghe. Following detailed information is available: Attachment name: SequenceDiagram.jpg Attachment size: 51846 Attachment link: http://wiki.apache.org/pig/PigAccumulatorSpec/homes/yinghe/Desktop?action=AttachFiledo=gettarget=SequenceDiagram.jpg Page link: http://wiki.apache.org/pig/PigAccumulatorSpec/homes/yinghe/Desktop
[Pig Wiki] Update of LoadStoreRedesignProposal by The jasNair
Dear Wiki user, You have subscribed to a wiki page or wiki category on Pig Wiki for change notification. The LoadStoreRedesignProposal page has been changed by ThejasNair. http://wiki.apache.org/pig/LoadStoreRedesignProposal?action=diffrev1=32rev2=33 -- '''Problem 2''': !PoissonSampleLoader samples 17 tuples from every set of tuples that will fit into reducer memory (see PigSkewedJoinSpec) . Let us call this number of tuples that fit into reducer memory - X. Ie we need to sample one tuple every X/17 tuples. - Earlier, the number of tuples to be sampled was calculated before the tuples were read, in !PoissonSampleLoader.computeSamples(..) . To get the number of samples to be sampled in a map, the formula used was = number-of-reducer-memories-needed * 17 / number-of-splits + Earlier, the number of tuples to be sampled was calculated before the tuples were read, in !PoissonSampleLoader.computeSamples(..) . To get the number of samples to be sampled in a map, the formula used was = number-of-reducer-memories-needed * 17 / number-of-splits BR Where - - number-of-reducer-memories-needed = (total_file_size * disk_to_mem_factor)/available_reducer_heap_size + number-of-reducer-memories-needed = (total_file_size * disk_to_mem_factor)/available_reducer_heap_sizeBR disk_to_mem_factor has default of 2. Then !PoissonSampleLoader would return sampled tuples by skipping split-size/num_samples bytes at a time. - With new loader we have to skip some number of tuples instead of bytes. But we don't have an estimate of total number of tuples in the input. + With new loader we have to skip some number of tuples instead of bytes. But we don't have an estimate of total number of tuples in the input.BR One way to work around this would be to use size of tuple in memory to estimate size of tuple in disk using above disk_to_mem_factor, then number of tuples to be skipped will be = (split-size/avg_mem_size_of_tuple)/numSamples But the use of disk_to_mem_factor is very dubious, the real disk_to_mem_factor will vary based on compression-algorithm, data characteristics (sorting etc), and encoding. '''Solution''': - The goal is to sample one tuple every X/17 tuples. (X = number of tuples that fit in available reducer memory) + The goal is to sample one tuple every X/17 tuples. (X = number of tuples that fit in available reducer memory).BR - To estimate X, we can use available_reducer_heap_size/average-tuple-mem-size + To estimate X, we can use available_reducer_heap_size/average-tuple-mem-size.BR Number of tuples skipped for every sampled tuple = 1/17 * ( available_reducer_heap_size/average-tuple-mem-size) The average-tuple-mem-size and number-of-tuples-to-be-skippled-every-sampled-tuple is recalculated after a new tuple is sampled.
[Pig Wiki] Update of PigAccumulatorSpec by yinghe
Dear Wiki user, You have subscribed to a wiki page or wiki category on Pig Wiki for change notification. The PigAccumulatorSpec page has been changed by yinghe. http://wiki.apache.org/pig/PigAccumulatorSpec?action=diffrev1=2rev2=3 -- }}} == When to Call Accumulator == - . MR plan is evaluated by an AccumulatorOptimizer to check if it is eligible to run in accumulative mode. Before AccumulatorOptimizer is called, another optimizer, SecondaryKeyOptimizer, should be called first. This optimizer checks if POSort or PODistinct in the inner plan of foreach can be removed/replaced by using secondary sorting key supported by hadoop. If it is POSort, then it is removed. If it is PODistinct, it is replaced by POSortedDistinct. Because of this optimizer, the last two use cases with order by and distinct inside foreach inner plan can still run in accumulative mode. + . MR plan is evaluated by an AccumulatorOptimizer to check if it is eligible to run in accumulative mode. Before AccumulatorOptimizer is called, another optimizer, SecondaryKeyOptimizer, should be called first. This optimizer checks if POSort or PODistinct in the inner plan of foreach can be removed/replaced by using secondary sorting key supported by hadoop. If it is POSort, then it is removed. If it is PODistinct, it is replaced by POSortedDistinct. Because of this optimizer, the last two use cases with order by and distinct inside foreach inner plan can still run in accumulative mode. The AccumulatorOptimizer checks the reducer plan and enables accumulator if following criteria are met: - The AccumulatorOptimizer checks the reducer plan and enables accumulator if following criteria are met: * The reducer plan uses POPackage as root, not any of its sub-classes. POPackage is not for distinct, and any of its input is not set as inner. * The successor of POPackage is a POForeach. * The leaves of each POForEach input plan is an ExpressionOperator and it must be one of the following: @@ -109, +108 @@ {{attachment:/homes/yinghe/Desktop/SequenceDiagram.jpg}} + == Internal Changes == + === Accumulator === + . A new interface that UDF can implement if it can run in accumulative mode. + + === PhysicalOperator === + . Add new methods setAccumulative(), setAccumStart(), setAccumEnd() to flag a physical operator to run in accumulative mode, and mark the start and end of accumulation. This change is in patch of PIG-1038. + + === MapReduceLauncher === + . Create AccumulatorOptimizer and use it to visit the plan. + + === AccumulatorOptimizer === + . Another MROpPlanVisitor. It checks the reduce plan, if it meets all the criteria, it sets the accumulative flag to POPackage and POForEach. It is created and invoked by MapReducerLauncher. + + === POStatus === + . Add a new state STATUS_BATCH_OK to indicate a batch is processed successfully in accumulative mode. + + === POForEach === + . If its accumulative flag is set, the bags passed to it through a tuple are AccumulativeBag as opposed to regular tuple bags. It gets AccumulativeTupleBuffer from the bag. Then it runs a while loop of calling nextBatch() of AccumulativeTupleBuffer, pass the input to inner plans. If an inner plan contains any UDF, the inner plan returns POStatus.STATUS_BATCH_OK if current batch is processed successfully. When there are no more batches to process, POForEach notifies each inner plan that accumulation is done, it makes a final call to get result and out of the while loop. At the end, POForEach returns the result to its successor in reducer plan. The operators that called POForEach doesn't need to know whether POForEach gets its result through regular mode or accumulative mode. + + === AccumulativeBag === + . An implementation of DataBag use by POPackage for processing data in accumulative mode. This bag doesn't contain all tuples from iterator. Instead, it wrapps up AccumultiveTupleBuffer, which contains iterator to pull tuples out in batches. Call the iterator() of this call only gives you the tuples for current batch. + + === AccumulativeTupleBuffer === + . An underlying buffer that is shared by all AccumulativeBags (one bag for group by, multiple bags for cogroup) generated by POPackage. POPackage has an inner class which implements this interface. POPackage creates an instance of this buffer and set it into the AccumulativeBags. This buffer has methods to retrieve next batch of tuples, which in turn calls methods of POPackage to read tuples out of iterator, and put them in an internal list. The AccumulativeBag has access to that list to return iterator of tuples. + + === POPackage === + . If its accumulative flag is set, it creates AccumulativeBag and AccumulativeTupleBuffer as opposed to creating default tuple bags. It then sets AccumulativeTupleBuffer into AccumulativeBag, and set ACcumulativeBag into the tuple in result. + POPackage also has
svn commit: r880975 - in /hadoop/pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
Author: gates Date: Mon Nov 16 21:43:41 2009 New Revision: 880975 URL: http://svn.apache.org/viewvc?rev=880975view=rev Log: PIG-1085: Pass JobConf and UDF specific configuration information to UDFs. Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=880975r1=880974r2=880975view=diff == --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Mon Nov 16 21:43:41 2009 @@ -23,6 +23,8 @@ INCOMPATIBLE CHANGES IMPROVEMENTS +PIG-1085: Pass JobConf and UDF specific configuration information to UDFs + (gates) OPTIMIZATIONS Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=880975r1=880974r2=880975view=diff == --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Mon Nov 16 21:43:41 2009 @@ -81,6 +81,7 @@ import org.apache.pig.impl.util.JarManager; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.Pair; +import org.apache.pig.impl.util.UDFContext; /** * This is compiler class that takes an MROperPlan and converts @@ -596,6 +597,9 @@ jobConf.setOutputCommitter(PigOutputCommitter.class); Job job = new Job(jobConf); jobStoreMap.put(job,new PairListPOStore, Path(storeLocations, tmpLocation)); + +// Serialize the UDF specific context info. +UDFContext.getUDFContext().serialize(jobConf); return job; } catch (JobCreationException jce) { throw jce; Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=880975r1=880974r2=880975view=diff == --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Mon Nov 16 21:43:41 2009 @@ -49,6 +49,7 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.SpillableMemoryManager; +import org.apache.pig.impl.util.UDFContext; public abstract class PigMapBase extends MapReduceBase{ private static final Tuple DUMMYTUPLE = null; @@ -166,6 +167,12 @@ keyType = ((byte[])ObjectSerializer.deserialize(job.get(pig.map.keytype)))[0]; pigReporter = new ProgressableReporter(); + +// Get the UDF specific context +UDFContext udfc = UDFContext.getUDFContext(); +udfc.addJobConf(job); +udfc.deserialize(); + if(!(mp.isEmpty())) { ListOperatorKey targetOpKeys = (ArrayListOperatorKey)ObjectSerializer.deserialize(job.get(map.target.ops)); @@ -178,7 +185,6 @@ } - } catch (IOException ioe) { String msg = Problem while configuring map plan.; throw new RuntimeException(msg, ioe); Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=880975r1=880974r2=880975view=diff == --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Mon Nov 16 21:43:41 2009 @@ -57,6 +57,7 @@ import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.SpillableMemoryManager; +import
svn commit: r881008 - in /hadoop/pig/trunk: src/org/apache/pig/impl/util/ test/org/apache/pig/test/ test/org/apache/pig/test/utils/
Author: gates Date: Mon Nov 16 22:18:46 2009 New Revision: 881008 URL: http://svn.apache.org/viewvc?rev=881008view=rev Log: PIG-1085 checking in files I missed in the last checkin. Added: hadoop/pig/trunk/src/org/apache/pig/impl/util/UDFContext.java hadoop/pig/trunk/test/org/apache/pig/test/TestUDFContext.java hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestEvalFunc.java hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestEvalFunc2.java hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestLoader.java Added: hadoop/pig/trunk/src/org/apache/pig/impl/util/UDFContext.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/util/UDFContext.java?rev=881008view=auto == --- hadoop/pig/trunk/src/org/apache/pig/impl/util/UDFContext.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/impl/util/UDFContext.java Mon Nov 16 22:18:46 2009 @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.util; + +import java.io.IOException; +//import java.io.Serializable; +import java.util.HashMap; +import java.util.Properties; + +import org.apache.hadoop.mapred.JobConf; + +import org.apache.pig.impl.util.ObjectSerializer; + +public class UDFContext { + +@SuppressWarnings(deprecation) +private JobConf jconf = null; +private HashMapInteger, Properties udfConfs; + +private static UDFContext self = null; + +private UDFContext() { +udfConfs = new HashMapInteger, Properties(); +} + +public static UDFContext getUDFContext() { +if (self == null) { +self = new UDFContext(); +} +return self; +} + +/** + * Adds the JobConf to this singleton. Will be + * called on the backend by the Map and Reduce + * functions so that UDFs can obtain the JobConf + * on the backend. + */ +@SuppressWarnings(deprecation) +public void addJobConf(JobConf conf) { +jconf = conf; +} + +/** + * Get the JobConf. This should only be called on + * the backend. It will return null on the frontend. + * @return JobConf for this job. This is a copy of the + * JobConf. Nothing written here will be kept by the system. + * getUDFConf should be used for recording UDF specific + * information. + */ +@SuppressWarnings(deprecation) +public JobConf getJobConf() { +if (jconf != null) return new JobConf(jconf); +else return null; +} + +/** + * Get a properties object that is specific to this UDF. + * Note that if a given UDF is called multiple times in a script, + * and each instance passes different arguments, then each will + * be provided with different configuration object. + * This can be used by loaders to pass their input object path + * or URI and separate themselves from other instances of the + * same loader. Constructor arguments could also be used, + * as they are available on both the front and back end. + * + * Note that this can only be used to share information + * across instantiations of the same function in the front end + * and between front end and back end. It cannot be used to + * share information between instantiations (that is, between + * map and/or reduce instances) on the back end at runtime. + * @param c of the UDF obtaining the properties object. + * @param args String arguments that make this instance of + * the UDF unique. + * @return A reference to the properties object specific to + * the calling UDF. This is a reference, not a copy. + * Any changes to this object will automatically be + * propogated to other instances of the UDF calling this + * function. + */ + +@SuppressWarnings(unchecked) +public Properties getUDFProperties(Class c, String[] args) { +Integer k = generateKey(c, args); +Properties p = udfConfs.get(k); +if (p == null) { +p = new Properties(); +udfConfs.put(k, p); +} +return p; +} + + /** + * Get a