New attachment added to page PigAccumulatorSpec/homes/yinghe/De sktop on Pig Wiki

2009-11-16 Thread Apache Wiki
Dear Wiki user,

You have subscribed to a wiki page PigAccumulatorSpec/homes/yinghe/Desktop 
for change notification. An attachment has been added to that page by yinghe. 
Following detailed information is available:

Attachment name: SequenceDiagram.jpg
Attachment size: 51846
Attachment link: 
http://wiki.apache.org/pig/PigAccumulatorSpec/homes/yinghe/Desktop?action=AttachFiledo=gettarget=SequenceDiagram.jpg
Page link: http://wiki.apache.org/pig/PigAccumulatorSpec/homes/yinghe/Desktop


[Pig Wiki] Update of LoadStoreRedesignProposal by The jasNair

2009-11-16 Thread Apache Wiki
Dear Wiki user,

You have subscribed to a wiki page or wiki category on Pig Wiki for change 
notification.

The LoadStoreRedesignProposal page has been changed by ThejasNair.
http://wiki.apache.org/pig/LoadStoreRedesignProposal?action=diffrev1=32rev2=33

--

  
  '''Problem 2''':
  !PoissonSampleLoader samples 17 tuples from every set of tuples that will fit 
into reducer memory (see PigSkewedJoinSpec) . Let us call this number of tuples 
that fit into reducer memory - X. Ie we need to sample one tuple every X/17 
tuples.
- Earlier, the number of tuples to be sampled was calculated before the tuples 
were read, in !PoissonSampleLoader.computeSamples(..) . To get the number of 
samples to be sampled in a map, the formula used was = 
number-of-reducer-memories-needed * 17 / number-of-splits
+ Earlier, the number of tuples to be sampled was calculated before the tuples 
were read, in !PoissonSampleLoader.computeSamples(..) . To get the number of 
samples to be sampled in a map, the formula used was = 
number-of-reducer-memories-needed * 17 / number-of-splits BR
  Where -
- number-of-reducer-memories-needed = (total_file_size * 
disk_to_mem_factor)/available_reducer_heap_size
+ number-of-reducer-memories-needed = (total_file_size * 
disk_to_mem_factor)/available_reducer_heap_sizeBR
  disk_to_mem_factor has default of 2.
  
  Then !PoissonSampleLoader would return sampled tuples by  skipping 
split-size/num_samples bytes at a time.
  
- With new loader we have to skip some number of tuples instead of bytes. But 
we don't have an estimate of total number of tuples in the input.
+ With new loader we have to skip some number of tuples instead of bytes. But 
we don't have an estimate of total number of tuples in the input.BR
  One way to work around this would be to use size of tuple in memory to 
estimate size of tuple in disk using above disk_to_mem_factor, then number of 
tuples to be skipped will be = (split-size/avg_mem_size_of_tuple)/numSamples
  
  But the use of disk_to_mem_factor is very dubious, the real 
disk_to_mem_factor will vary based on compression-algorithm, data 
characteristics (sorting etc), and encoding.
  
  '''Solution''':
- The goal is to sample one tuple every X/17 tuples. (X = number of tuples that 
fit in available reducer memory)
+ The goal is to sample one tuple every X/17 tuples. (X = number of tuples that 
fit in available reducer memory).BR
- To estimate X, we can use available_reducer_heap_size/average-tuple-mem-size
+ To estimate X, we can use 
available_reducer_heap_size/average-tuple-mem-size.BR
  Number of tuples skipped for every sampled tuple = 1/17 * ( 
available_reducer_heap_size/average-tuple-mem-size)
  
  The average-tuple-mem-size and 
number-of-tuples-to-be-skippled-every-sampled-tuple is recalculated after a new 
tuple is sampled.


[Pig Wiki] Update of PigAccumulatorSpec by yinghe

2009-11-16 Thread Apache Wiki
Dear Wiki user,

You have subscribed to a wiki page or wiki category on Pig Wiki for change 
notification.

The PigAccumulatorSpec page has been changed by yinghe.
http://wiki.apache.org/pig/PigAccumulatorSpec?action=diffrev1=2rev2=3

--

  }}}
  
  == When to Call Accumulator ==
-  . MR plan is evaluated by an AccumulatorOptimizer to check if it is eligible 
to run in accumulative mode. Before AccumulatorOptimizer is called, another 
optimizer, SecondaryKeyOptimizer, should be called first. This optimizer checks 
if POSort or PODistinct in the inner plan of foreach can be removed/replaced by 
using secondary sorting key supported by hadoop. If it is POSort, then it is 
removed. If it is PODistinct, it is replaced by POSortedDistinct. Because of 
this optimizer, the last two use cases with order by and distinct inside 
foreach inner plan can still run in accumulative mode.
+  . MR plan is evaluated by an AccumulatorOptimizer to check if it is eligible 
to run in accumulative mode. Before AccumulatorOptimizer is called, another 
optimizer, SecondaryKeyOptimizer, should be called first. This optimizer checks 
if POSort or PODistinct in the inner plan of foreach can be removed/replaced by 
using secondary sorting key supported by hadoop. If it is POSort, then it is 
removed. If it is PODistinct, it is replaced by POSortedDistinct. Because of 
this optimizer, the last two use cases with order by and distinct inside 
foreach inner plan can still run in accumulative mode. The AccumulatorOptimizer 
checks the reducer plan and enables accumulator if following criteria are met:
-  The AccumulatorOptimizer checks the reducer plan and enables accumulator if 
following criteria are met:
* The reducer plan uses POPackage as root, not any of its sub-classes. 
POPackage is not for distinct, and any of its input is not set as inner.
* The successor of POPackage is a POForeach.
* The leaves of each POForEach input plan is an ExpressionOperator and it 
must be one of the following:
@@ -109, +108 @@

  
  {{attachment:/homes/yinghe/Desktop/SequenceDiagram.jpg}}
  
+ == Internal Changes ==
+ === Accumulator ===
+  . A new interface that UDF can implement if it can run in accumulative mode.
+ 
+ === PhysicalOperator ===
+  . Add new methods setAccumulative(), setAccumStart(), setAccumEnd() to flag 
a physical operator to run in accumulative mode, and mark the start and end of 
accumulation. This change is in patch of PIG-1038.
+ 
+ === MapReduceLauncher ===
+  . Create AccumulatorOptimizer and use it to visit the plan.
+ 
+ === AccumulatorOptimizer ===
+  . Another MROpPlanVisitor. It checks the reduce plan, if it meets all the 
criteria, it sets the accumulative flag to POPackage and POForEach. It is 
created and invoked by MapReducerLauncher.
+ 
+ === POStatus ===
+  . Add a new state STATUS_BATCH_OK to indicate a batch is processed 
successfully in accumulative mode.
+ 
+ === POForEach ===
+  . If its accumulative flag is set, the bags passed to it through a tuple 
are AccumulativeBag as opposed to regular tuple bags. It gets 
AccumulativeTupleBuffer from the bag. Then it runs a while loop of calling 
nextBatch() of AccumulativeTupleBuffer, pass the input to inner plans. If an 
inner plan contains any UDF, the inner plan returns POStatus.STATUS_BATCH_OK if 
current batch is processed successfully. When there are no more batches to 
process, POForEach notifies each inner plan that accumulation is done, it makes 
a final call to get result and out of the while loop. At the end, POForEach 
returns the result to its successor in reducer plan. The operators that called 
POForEach doesn't need to know whether POForEach gets its result through 
regular mode or accumulative mode.
+ 
+ === AccumulativeBag ===
+  . An implementation of DataBag use by POPackage for processing data in 
accumulative mode. This bag doesn't contain all tuples from iterator. Instead, 
it wrapps up AccumultiveTupleBuffer, which contains iterator to pull tuples out 
in batches. Call the iterator() of this call only gives you the tuples for 
current batch.
+ 
+ === AccumulativeTupleBuffer ===
+  . An underlying buffer that is shared by all AccumulativeBags (one bag for 
group by, multiple bags for cogroup) generated by POPackage. POPackage has an 
inner class which implements this interface. POPackage creates an instance of 
this buffer and set it into the AccumulativeBags. This buffer has methods to 
retrieve next batch of tuples, which in turn calls methods of POPackage to read 
tuples out of iterator, and put them in an internal list. The AccumulativeBag 
has access to that list to return iterator of tuples.
+ 
+ === POPackage ===
+  . If its accumulative flag is set, it creates AccumulativeBag and 
AccumulativeTupleBuffer as opposed to creating default tuple bags. It then sets 
AccumulativeTupleBuffer into AccumulativeBag, and set ACcumulativeBag into the 
tuple in result.
+  POPackage also has 

svn commit: r880975 - in /hadoop/pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/

2009-11-16 Thread gates
Author: gates
Date: Mon Nov 16 21:43:41 2009
New Revision: 880975

URL: http://svn.apache.org/viewvc?rev=880975view=rev
Log:
PIG-1085:  Pass JobConf and UDF specific configuration information to UDFs.


Modified:
hadoop/pig/trunk/CHANGES.txt

hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java

hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java

hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=880975r1=880974r2=880975view=diff
==
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Nov 16 21:43:41 2009
@@ -23,6 +23,8 @@
 INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
+PIG-1085:  Pass JobConf and UDF specific configuration information to UDFs
+  (gates)
 
 OPTIMIZATIONS
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=880975r1=880974r2=880975view=diff
==
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 Mon Nov 16 21:43:41 2009
@@ -81,6 +81,7 @@
 import org.apache.pig.impl.util.JarManager;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.util.UDFContext;
 
 /**
  * This is compiler class that takes an MROperPlan and converts
@@ -596,6 +597,9 @@
 jobConf.setOutputCommitter(PigOutputCommitter.class);
 Job job = new Job(jobConf);
 jobStoreMap.put(job,new PairListPOStore, Path(storeLocations, 
tmpLocation));
+
+// Serialize the UDF specific context info.
+UDFContext.getUDFContext().serialize(jobConf);
 return job;
 } catch (JobCreationException jce) {
throw jce;

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=880975r1=880974r2=880975view=diff
==
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
 Mon Nov 16 21:43:41 2009
@@ -49,6 +49,7 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.SpillableMemoryManager;
+import org.apache.pig.impl.util.UDFContext;
 
 public abstract class PigMapBase extends MapReduceBase{
 private static final Tuple DUMMYTUPLE = null;
@@ -166,6 +167,12 @@
 keyType = 
((byte[])ObjectSerializer.deserialize(job.get(pig.map.keytype)))[0];
 
 pigReporter = new ProgressableReporter();
+
+// Get the UDF specific context
+UDFContext udfc = UDFContext.getUDFContext();
+udfc.addJobConf(job);
+udfc.deserialize();
+
 if(!(mp.isEmpty())) {
 ListOperatorKey targetOpKeys = 
 
(ArrayListOperatorKey)ObjectSerializer.deserialize(job.get(map.target.ops));
@@ -178,7 +185,6 @@
 }
 
 
-
 } catch (IOException ioe) {
 String msg = Problem while configuring map plan.;
 throw new RuntimeException(msg, ioe);

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=880975r1=880974r2=880975view=diff
==
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
 Mon Nov 16 21:43:41 2009
@@ -57,6 +57,7 @@
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.SpillableMemoryManager;
+import 

svn commit: r881008 - in /hadoop/pig/trunk: src/org/apache/pig/impl/util/ test/org/apache/pig/test/ test/org/apache/pig/test/utils/

2009-11-16 Thread gates
Author: gates
Date: Mon Nov 16 22:18:46 2009
New Revision: 881008

URL: http://svn.apache.org/viewvc?rev=881008view=rev
Log:
PIG-1085  checking in files I missed in the last checkin.


Added:
hadoop/pig/trunk/src/org/apache/pig/impl/util/UDFContext.java
hadoop/pig/trunk/test/org/apache/pig/test/TestUDFContext.java
hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestEvalFunc.java
hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestEvalFunc2.java
hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestLoader.java

Added: hadoop/pig/trunk/src/org/apache/pig/impl/util/UDFContext.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/util/UDFContext.java?rev=881008view=auto
==
--- hadoop/pig/trunk/src/org/apache/pig/impl/util/UDFContext.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/util/UDFContext.java Mon Nov 16 
22:18:46 2009
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.util;
+
+import java.io.IOException;
+//import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Properties;
+
+import org.apache.hadoop.mapred.JobConf;
+
+import org.apache.pig.impl.util.ObjectSerializer;
+
+public class UDFContext {
+
+@SuppressWarnings(deprecation)
+private JobConf jconf = null;
+private HashMapInteger, Properties udfConfs;
+
+private static UDFContext self = null;
+
+private UDFContext() {
+udfConfs = new HashMapInteger, Properties();
+}
+
+public static UDFContext getUDFContext() {
+if (self == null) {
+self = new UDFContext();
+}
+return self;
+}
+
+/**
+ * Adds the JobConf to this singleton.  Will be 
+ * called on the backend by the Map and Reduce 
+ * functions so that UDFs can obtain the JobConf
+ * on the backend.
+ */
+@SuppressWarnings(deprecation)
+public void addJobConf(JobConf conf) {
+jconf = conf;
+}
+
+/**
+ * Get the JobConf.  This should only be called on
+ * the backend.  It will return null on the frontend.
+ * @return JobConf for this job.  This is a copy of the
+ * JobConf.  Nothing written here will be kept by the system.
+ * getUDFConf should be used for recording UDF specific
+ * information.
+ */
+@SuppressWarnings(deprecation)
+public JobConf getJobConf() {
+if (jconf != null)  return new JobConf(jconf);
+else return null;
+}
+
+/**
+ * Get a properties object that is specific to this UDF.
+ * Note that if a given UDF is called multiple times in a script, 
+ * and each instance passes different arguments, then each will
+ * be provided with different configuration object.
+ * This can be used by loaders to pass their input object path
+ * or URI and separate themselves from other instances of the
+ * same loader.  Constructor arguments could also be used,
+ * as they are available on both the front and back end.
+ *
+ * Note that this can only be used to share information
+ * across instantiations of the same function in the front end
+ * and between front end and back end.  It cannot be used to
+ * share information between instantiations (that is, between
+ * map and/or reduce instances) on the back end at runtime.
+ * @param c of the UDF obtaining the properties object.
+ * @param args String arguments that make this instance of
+ * the UDF unique.
+ * @return A reference to the properties object specific to
+ * the calling UDF.  This is a reference, not a copy.
+ * Any changes to this object will automatically be 
+ * propogated to other instances of the UDF calling this 
+ * function.
+ */
+
+@SuppressWarnings(unchecked)
+public Properties getUDFProperties(Class c, String[] args) {
+Integer k = generateKey(c, args);
+Properties p = udfConfs.get(k);
+if (p == null) {
+p = new Properties();
+udfConfs.put(k, p);
+}
+return p;
+}
+
+ /**
+ * Get a