Added: incubator/pig/trunk/src/org/apache/pig/pen/ExGen.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/pen/ExGen.java?rev=647253&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/pen/ExGen.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/pen/ExGen.java Fri Apr 11 11:24:23 
2008
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.pen;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.backend.executionengine.ExecPhysicalPlan;
+import org.apache.pig.backend.local.executionengine.LocalExecutionEngine;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.ExampleTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.impl.util.LineageTracer;
+
+public class ExGen {
+
+       final static int SAMPLE_SIZE = 10000;    // size of sample used in 
initial downstream pass
+    
+    static Map<LOLoad, DataBag> GlobalBaseData = new HashMap<LOLoad, 
DataBag>();
+    
+    public static Map<LogicalOperator, DataBag> GenerateExamples(LogicalPlan 
plan, PigContext pigContext) throws IOException {
+        
+       long time = System.currentTimeMillis();
+       String Result;
+       
+       //compile the logical plan to get the physical plan once and for all
+       ExecPhysicalPlan PhyPlan = null;
+       try {
+                       PhyPlan = pigContext.getExecutionEngine().compile(plan, 
null);
+               } catch (ExecException e1) {
+                       // TODO Auto-generated catch block
+                       e1.printStackTrace();
+               }
+               Map<OperatorKey, OperatorKey> logicalToPhysicalKeys = 
((LocalExecutionEngine)pigContext.getExecutionEngine()).getLogicalToPhysicalMap();
+               Map<OperatorKey, ExecPhysicalOperator> physicalOpTable = 
PhyPlan.getOpTable();
+               
+        // Acquire initial base data by sampling from input relations (this is 
idempotent)
+       FetchBaseData.ReadBaseData(plan.getRootOperator(), GlobalBaseData, 
SAMPLE_SIZE, pigContext);
+               
+        /////// PASS 1: push data sample through query plan
+        
+        // Push base data through query plan
+        Map<LogicalOperator, DataBag> derivedData;
+        Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> 
OperatorToEqClasses = new HashMap<LogicalOperator, 
Collection<IdentityHashSet<Tuple>>>();
+        Collection<IdentityHashSet<Tuple>> equivalenceClasses = new 
LinkedList<IdentityHashSet<Tuple>>();
+        derivedData = DerivedData.CreateDerivedData(plan.getRootOperator(), 
GlobalBaseData, logicalToPhysicalKeys, physicalOpTable);
+//        derivedData = DerivedData.CreateDerivedData(plan.getRootOperator(), 
GlobalBaseData, logicalToPhysicalKeys, physicalOpTable);
+        
+        /////// PASS 2: augment data sample to reduce sparsity
+        
+        // Use constraint back-prop to synthesize additional base data, in 
order to reduce sparsity
+        // (and keep track of which tuples are synthetic)
+        IdentityHashSet<Tuple> syntheticTuples = new IdentityHashSet<Tuple>(); 
 
+        Map<LOLoad, DataBag> modifiedBaseData = 
AugmentData.AugmentBaseData(plan.getRootOperator(), GlobalBaseData, 
syntheticTuples, derivedData, pigContext);
+        
+        {
+               LineageTracer lineage = new LineageTracer();
+               derivedData = 
DerivedData.CreateDerivedData(plan.getRootOperator(), modifiedBaseData, 
lineage, equivalenceClasses, OperatorToEqClasses, logicalToPhysicalKeys, 
physicalOpTable);
+               modifiedBaseData = ShapeLineage.PruneBaseData(modifiedBaseData, 
derivedData.get(plan.getRoot()), syntheticTuples, lineage, equivalenceClasses);
+
+        }
+
+        {
+               LineageTracer lineage = new LineageTracer();
+               derivedData = 
DerivedData.CreateDerivedData(plan.getRootOperator(), modifiedBaseData, 
lineage, equivalenceClasses, OperatorToEqClasses, logicalToPhysicalKeys, 
physicalOpTable);
+               modifiedBaseData = 
ShapeLineage.TrimLineages(plan.getRootOperator(), modifiedBaseData, 
derivedData, lineage, OperatorToEqClasses, logicalToPhysicalKeys, 
physicalOpTable);
+        }
+        
+        {
+               LineageTracer lineage = new LineageTracer();
+               derivedData = 
DerivedData.CreateDerivedData(plan.getRootOperator(), modifiedBaseData, 
lineage, null, null, logicalToPhysicalKeys, physicalOpTable);
+        
+        }
+        
+        // Push finalized base data through query plan, to generate final 
answer
+        // also mark the tuples being displayed non-omittable
+
+        //derivedData = CreateDerivedData(plan.getRootOperator(), 
modifiedBaseData, logicalToPhysicalKeys, physicalOpTable);
+        /*for(Map.Entry<LogicalOperator, DataBag> e : derivedData.entrySet()) {
+               DataBag bag = e.getValue();
+               for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+                       ExampleTuple t = (ExampleTuple) it.next();
+                       t.omittable = false;
+                       
+               }
+        }*/
+        
+        /*Result = PrintExamples(plan, derivedData, OperatorToEqClasses);
+        System.out.println("Time taken : " + (System.currentTimeMillis() - 
time) + "ms");
+        return Result;*/
+        return derivedData;
+    }
+    
+}

Added: incubator/pig/trunk/src/org/apache/pig/pen/FetchBaseData.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/pen/FetchBaseData.java?rev=647253&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/pen/FetchBaseData.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/pen/FetchBaseData.java Fri Apr 11 
11:24:23 2008
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.pen;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.backend.local.executionengine.POLoad;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator;
+import org.apache.pig.impl.util.LineageTracer;
+
+public class FetchBaseData {
+       static void ReadBaseData(LogicalOperator op, Map<LOLoad, DataBag> 
baseData, int sampleSize, PigContext pigContext) throws IOException {
+           
+       if (baseData == null) {
+               throw new IOException("BaseData is null!");
+       } 
+
+       DataBag test = baseData.get(op);
+       if(test != null) {
+               //The data exists locally and need not be fetched again
+               return;
+       }
+       
+        if (op instanceof LOLoad) {
+               FileSpec fileSpec = ((LOLoad)op).getInputFileSpec(); 
+               
+               if(op.outputSchema().fields.isEmpty()) {
+                       throw new IOException("Illustrate command needs a user 
defined schema to function. Please specify a schema while loading the data.");
+               }
+
+               DataBag opBaseData = BagFactory.getInstance().newDefaultBag();
+            //POLoad poLoad = new POLoad(pigContext, ((LOLoad) 
op).getInputFileSpec(), op.getOutputType());
+               POLoad poLoad = new POLoad(op.getScope(), 
+                                                                       
NodeIdGenerator.getGenerator().getNextNodeId(op.getOperatorKey().getScope()), 
+                                                                       new 
HashMap<OperatorKey, ExecPhysicalOperator> (),
+                                                                       
pigContext,
+                                                                       
fileSpec,
+                                                                       
LogicalOperator.FIXED
+                                                                       );
+               poLoad.setLineageTracer(new LineageTracer());
+            poLoad.open();
+            for (int i = 0; i < sampleSize; i++) {
+                Tuple t = poLoad.getNext();
+
+                if (t == null) break;
+                opBaseData.add(t);
+            }
+            poLoad.close();
+            
+            baseData.put((LOLoad) op, opBaseData);
+        } else {
+            /*for (Iterator<LogicalOperator> it = op.getInputs().iterator(); 
it.hasNext(); ) {
+                ReadBaseData(it.next(), baseData, sampleSize, pigContext);
+            }*/
+               for(OperatorKey opKey : op.getInputs()) {
+                       ReadBaseData(op.getOpTable().get(opKey), baseData, 
sampleSize, pigContext);
+               }
+        }
+    }
+}

Added: incubator/pig/trunk/src/org/apache/pig/pen/Metrics.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/pen/Metrics.java?rev=647253&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/pen/Metrics.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/pen/Metrics.java Fri Apr 11 11:24:23 
2008
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.pen;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.ExampleTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.eval.FilterSpec;
+import org.apache.pig.impl.logicalLayer.LOEval;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.util.IdentityHashSet;
+
+public class Metrics {
+       static float getRealness(LogicalOperator op, Map<LogicalOperator, 
DataBag> exampleData, boolean overallRealness) {
+       //StringBuffer str = new StringBuffer();
+       int noTuples = 0;
+       int noSynthetic = 0;
+       for(Map.Entry<LogicalOperator, DataBag> e : exampleData.entrySet()) {
+               //if(e.getKey() instanceof LORead) continue;
+               DataBag bag;
+               if(overallRealness) {
+                       bag = exampleData.get(e.getKey());
+               } else {
+                       bag = exampleData.get(op);
+               }
+               noTuples += bag.cardinality();
+               for(Iterator<Tuple> it = bag.iterator(); it.hasNext();) {
+                       if(((ExampleTuple)it.next()).isSynthetic()) 
noSynthetic++;
+               }
+               if(!overallRealness) break;
+
+               }
+       
+       if(overallRealness) {
+               return 100*(1 - ((float)noSynthetic / (float)noTuples));
+       } else {
+               return 100*(1 - ((float)noSynthetic / (float)noTuples));
+       }
+       
+    }
+
+    static float getConciseness(LogicalOperator op, Map<LogicalOperator, 
DataBag> exampleData, Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> 
OperatorToEqClasses, boolean overallConciseness) {
+       DataBag bag = exampleData.get(op);
+
+       int noEqCl = OperatorToEqClasses.get(op).size();
+       int noTuples = bag.cardinality();
+       
+       
+       float conciseness = 100*((float)noEqCl / (float)noTuples);
+       if(!overallConciseness) {
+       
+               return ((conciseness > 100.0) ? 100.0f : conciseness);
+       } else {
+
+       
+               noEqCl = 0;
+               noTuples = 0;
+               conciseness = 0;
+               int noOperators = 0;
+
+               for(Map.Entry<LogicalOperator, 
Collection<IdentityHashSet<Tuple>>> e : OperatorToEqClasses.entrySet()) {
+                       //if(e.getKey() instanceof LORead) continue;
+                       noOperators++; //we need to keep a track of these and 
not use OperatorToEqClasses.size() as LORead shouldn't be considered a operator
+                       bag = exampleData.get(e.getKey());
+       
+                       noTuples = bag.cardinality();
+                       noEqCl = e.getValue().size();
+                       float concise = 100*((float)noEqCl / (float)noTuples);
+                       concise = (concise > 100) ? 100 : concise;
+                       conciseness += concise;
+               }
+               conciseness /= (float)noOperators;
+               
+               return conciseness;
+       }
+    
+    }
+    
+    
+    static float getCompleteness(LogicalOperator op, Map<LogicalOperator, 
DataBag> exampleData, Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> 
OperatorToEqClasses, boolean overallCompleteness) {
+    
+       int noClasses = 0;
+       int noCoveredClasses = 0;
+       int noOperators = 0;
+       Map<Integer, Boolean> coveredClasses;
+       float completeness = 0;
+       if(!overallCompleteness) {
+               Collection<IdentityHashSet<Tuple>> eqClasses = 
OperatorToEqClasses.get(op);
+               DataBag bag;
+               if(op instanceof LOEval && ((LOEval)op).getSpec() instanceof 
FilterSpec) {
+                       //bag = exampleData.get(op.getInputs().get(0));
+                       bag = 
exampleData.get(op.getOpTable().get(op.getInputs().get(0)));
+               } else {
+                       bag = exampleData.get(op);
+               }
+               coveredClasses = getCompletenessLogic(bag, eqClasses);
+               noClasses = eqClasses.size();
+               for(Map.Entry<Integer, Boolean> e : coveredClasses.entrySet() ) 
{
+                       if(e.getValue()) {
+                               noCoveredClasses ++;
+                       }
+               }
+               
+    
+               return 100*((float)noCoveredClasses)/(float)noClasses;
+       } else {
+               for(Map.Entry<LogicalOperator, 
Collection<IdentityHashSet<Tuple>>> e : OperatorToEqClasses.entrySet()) {
+                       noCoveredClasses = 0;
+                       noClasses = 0;
+                       
+                       //if(e.getKey() instanceof LORead) continue; //We don't 
consider LORead a operator.
+                       
+                       noOperators++;
+                       Collection<IdentityHashSet<Tuple>> eqClasses = 
e.getValue();
+                       LogicalOperator lop = e.getKey();
+                       DataBag bag;
+                       if(lop instanceof LOEval && ((LOEval)lop).getSpec() 
instanceof FilterSpec) {
+                               //bag = exampleData.get(lop.getInputs().get(0));
+                               bag = 
exampleData.get(lop.getOpTable().get(lop.getInputs().get(0)));
+                       } else {
+                               bag = exampleData.get(lop);
+                       }
+                       coveredClasses = getCompletenessLogic(bag, eqClasses);
+                       noClasses += eqClasses.size();
+                       for(Map.Entry<Integer, Boolean> e_result : 
coveredClasses.entrySet() ) {
+                               if(e_result.getValue()) {
+                                       noCoveredClasses ++;
+                               }
+                       }
+                       completeness += 
100*((float)noCoveredClasses/(float)noClasses);
+               }
+               completeness /= (float)noOperators;
+    
+               return completeness;
+       }
+       
+    
+    }
+    
+    static Map<Integer, Boolean> getCompletenessLogic(DataBag bag, 
Collection<IdentityHashSet<Tuple>> eqClasses) {
+       Map<Integer, Boolean> coveredClasses = new HashMap<Integer, Boolean> ();
+       
+               for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+                       Tuple t = it.next();
+                       int classId = 0;
+                       for(IdentityHashSet<Tuple> eqClass : eqClasses) {
+
+                               if(eqClass.contains(t)) {
+                                       coveredClasses.put(classId, true);
+                               }
+                               classId ++;
+                       }
+               }
+
+       
+               return coveredClasses;
+       
+    }
+}

Added: incubator/pig/trunk/src/org/apache/pig/pen/ShapeLineage.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/pen/ShapeLineage.java?rev=647253&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/pen/ShapeLineage.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/pen/ShapeLineage.java Fri Apr 11 
11:24:23 2008
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.pen;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.eval.FilterSpec;
+import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LOEval;
+import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.impl.util.LineageTracer;
+
+
+public class ShapeLineage {
+       static Map<LOLoad, DataBag> TrimLineages(LogicalOperator root, 
Map<LOLoad, DataBag> baseData, Map<LogicalOperator, DataBag> derivedData, 
LineageTracer lineage, Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> 
OperatorToEqClasses, Map<OperatorKey, OperatorKey> logicalToPhysicalKeys, 
Map<OperatorKey, ExecPhysicalOperator> physicalOpTable) {
+       Map <LOLoad, DataBag> modifiedBaseData = new HashMap<LOLoad, DataBag>();
+       //modifiedBaseData.putAll(baseData);
+       
+       List<IdentityHashSet<Tuple>> affinityGroups = new 
LinkedList<IdentityHashSet<Tuple>>();
+       IdentityHashSet<Tuple> a1 = new IdentityHashSet<Tuple>();
+       affinityGroups.add(a1);
+       IdentityHashSet<Tuple> a2 = new IdentityHashSet<Tuple>();
+       affinityGroups.add(a2);
+       TrimLineages(root, root, modifiedBaseData, derivedData, affinityGroups, 
lineage, OperatorToEqClasses, 0.0, logicalToPhysicalKeys, physicalOpTable);
+       
+       return modifiedBaseData;
+       
+    }
+    
+    static void TrimLineages(LogicalOperator root, LogicalOperator currentOp, 
Map<LOLoad, DataBag> baseData, Map<LogicalOperator, DataBag> derivedData, 
List<IdentityHashSet<Tuple>> affinityGroups, LineageTracer lineage, 
Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> OperatorToEqClasses, 
double completeness, Map<OperatorKey, OperatorKey> logicalToPhysicalKeys, 
Map<OperatorKey, ExecPhysicalOperator> physicalOpTable) {
+       //With lineage added in the lineageTracer to track the parent-child 
relationship, only filter needs a consideration since we need a record that 
doesn't satisfy the filter condition
+       IdentityHashSet<Tuple> affinityGroup = affinityGroups.get(0);
+       if(affinityGroup.size() == 0) {
+               //first operator/root in the logical plan
+               DataBag bag = derivedData.get(currentOp);
+               for(Iterator<Tuple> it = bag.iterator(); it.hasNext();) {
+                       Tuple t = it.next();
+                       if(lineage.flattenLineageContains(t)) {
+                               affinityGroup.add(t);
+                               break;
+                       }
+                       affinityGroup.add(t);
+               }
+       }
+       
+       
+       //if any member of the affinity group is present in the flattenLineage, 
that needs to be handled specially
+       IdentityHashSet<Tuple> flattenElements = affinityGroups.get(1);
+       
+       IdentityHashSet<Tuple> newAffinityGroup = new IdentityHashSet<Tuple>();
+       if(currentOp instanceof LOEval) {
+               EvalSpec spec = ((LOEval)currentOp).getSpec();
+               if(spec instanceof FilterSpec) {
+                       //here we need to put a Tuple that doesn't pass the 
filter into the affinity groups.
+                       LogicalOperator input = 
currentOp.getOpTable().get(currentOp.getInputs().get(0)); //since its a filter, 
we know there is only 1 input.
+                       DataBag inputData = derivedData.get(input);
+                       for(Iterator<Tuple> it = inputData.iterator(); 
it.hasNext();) {
+                               Tuple t = it.next();
+                               if(!((FilterSpec)spec).cond.eval(t)) {
+                                       newAffinityGroup.add(t);
+                                       break;
+                               }
+                       }
+               }
+               for(Tuple t : affinityGroup) {
+                       if(lineage.flattenLineageContains(t)) {
+                               
flattenElements.addAll(lineage.getFlattenChildren(t));
+                       }
+                       newAffinityGroup.addAll(lineage.getChildren(t));
+               }
+               //Affinity group for eval ready, calling recursion
+               affinityGroup.clear();
+               affinityGroup.addAll(newAffinityGroup);
+               TrimLineages(root, 
currentOp.getOpTable().get(currentOp.getInputs().get(0)), baseData, 
derivedData, affinityGroups, lineage, OperatorToEqClasses, completeness, 
logicalToPhysicalKeys, physicalOpTable);
+       }
+       
+       if(currentOp instanceof LOCogroup) {
+               
+               List<OperatorKey> inputs = currentOp.getInputs();
+               int numInputs = inputs.size();
+               IdentityHashSet<Tuple> flattenAffinityGroup = 
affinityGroups.get(1);
+               
+               
+               if(numInputs == 1) {
+                       //this is a group case
+                       double score = 0;
+                       double bestScore = -1;
+                       int best = 0;
+                       double nextBestScore = -1;
+                       int nextBest = 0;
+                       for(Tuple t : affinityGroup) {
+                               int index = 0;
+                               List<Tuple> children = lineage.getChildren(t);
+                               if(children.size() == 1) {
+                                       Tuple test = children.get(0);
+                                       if(!newAffinityGroup.contains(test))
+                                               newAffinityGroup.add(test);
+                               } else {
+                                       if(flattenAffinityGroup.size() > 0) {
+                                               
newAffinityGroup.addAll(flattenAffinityGroup);
+                                               flattenAffinityGroup.clear();
+                                       } else {
+                                               for(Tuple child : children) {
+                                                       score = 1 / 
lineage.getWeightedCounts(child, 2, 1);
+                                                       if(score > bestScore) {
+                                                               nextBest = best;
+                                                               best = index;
+                                                               nextBestScore = 
bestScore;
+                                                               bestScore = 
score;
+                                                       } else if(score > 
nextBestScore) {
+                                                               nextBest = 
index;
+                                                               nextBestScore = 
score;
+                                                       }
+                                                       index++;
+                                               }
+
+                                               
newAffinityGroup.add(children.get(best));
+                                               
newAffinityGroup.add(children.get(nextBest));
+                                       }
+                               }
+                       }
+                       affinityGroup.clear();
+                       affinityGroup.addAll(newAffinityGroup);
+                       TrimLineages(root, 
currentOp.getOpTable().get(inputs.get(0)), baseData, derivedData, 
affinityGroups, lineage, OperatorToEqClasses, completeness, 
logicalToPhysicalKeys, physicalOpTable);
+               } else {
+                       //This is a cogroup case
+                       newAffinityGroup.addAll(affinityGroup);
+                       for(int i = 0; i < numInputs; i++) {
+                               IdentityHashSet<Tuple> cogroupAffinityGroup = 
new IdentityHashSet<Tuple>();
+                               for(Tuple t : newAffinityGroup) {
+                                       DataBag data = t.getBagField(i+1);
+                                               //Ideally we should have no 
field with null values because of synthetic data generation. 
+                                               if(data.size() == 0)
+                                                       continue;
+                                               
+                                               if(flattenAffinityGroup.size() 
> 0) {
+                                                       for(Iterator<Tuple> it 
= data.iterator(); it.hasNext(); ) {
+                                                               Tuple child = 
it.next();
+                                                               
if(flattenAffinityGroup.contains(child)) {
+                                                                       
cogroupAffinityGroup.add(child);
+                                                                       
flattenAffinityGroup.remove(child);
+                                                               }
+                                                       }
+                                               } else {
+                                                       //The children are all 
the tuples present in data
+                                                       Tuple best = null;
+                                                       double bestScore = -1;
+                                                       for(Iterator<Tuple> it 
= data.iterator(); it.hasNext(); ) {
+                                                               Tuple child = 
it.next();
+
+                                                               double score = 
1 / lineage.getWeightedCounts(child, 2, 1);
+                                                               if(score > 
bestScore) {
+                                                                       best = 
child;
+                                                                       
bestScore = score;
+                                                               }
+                                                       }
+                                                       
cogroupAffinityGroup.add(best);
+                                               }
+                               }
+                               affinityGroup.clear();
+                               affinityGroup.addAll(cogroupAffinityGroup);
+                               TrimLineages(root, 
currentOp.getOpTable().get(inputs.get(i)), baseData, derivedData, 
affinityGroups, lineage, OperatorToEqClasses, completeness, 
logicalToPhysicalKeys, physicalOpTable);
+                       }
+               }
+       }
+       
+       
+       if(currentOp instanceof LOLoad) {
+               
+               getBaseData(currentOp, affinityGroup, lineage, baseData, 
derivedData);
+       }
+               
+       return;
+       
+    }
+    
+       
+
+    static void getBaseData(LogicalOperator lOp, IdentityHashSet<Tuple> 
affinityGroup, LineageTracer lineage, Map<LOLoad, DataBag> baseData, 
Map<LogicalOperator, DataBag> derivedData) {
+       DataBag bag = baseData.get(lOp);
+       IdentityHashSet<Tuple> temp = new IdentityHashSet<Tuple>();
+       if(bag == null) {
+               bag = BagFactory.getInstance().newDefaultBag();
+               baseData.put((LOLoad) lOp, bag);
+       } else {
+               //Now we try to ensure that the same tuple is not added twice
+               //In effect we are trying to get the union of multiple updates 
happening to the baseData
+               for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+                       temp.add(it.next());
+               }
+               bag.clear();
+       }
+       
+       for(Tuple t : affinityGroup) {
+               temp.add(t);
+       }
+       for(Tuple t : temp) {
+               bag.add(t);
+       }
+       return;
+    }
+    
+    static Map<LOLoad, DataBag> PruneBaseData(Map<LOLoad, DataBag> baseData, 
DataBag rootOutput, IdentityHashSet<Tuple> syntheticTuples, LineageTracer 
lineage, Collection<IdentityHashSet<Tuple>> equivalenceClasses) {
+        
+        IdentityHashMap<Tuple, Collection<Tuple>> membershipMap = 
lineage.getMembershipMap();
+        IdentityHashMap<Tuple, Double> lineageGroupWeights = 
lineage.getWeightedCounts(2f, 1);
+        
+        // compute a mapping from lineage group to the set of equivalence 
classes covered by it
+        IdentityHashMap<Tuple, Set<Integer>> lineageGroupToEquivClasses = new 
IdentityHashMap<Tuple, Set<Integer>>();
+        int equivClassId = 0;
+        for (IdentityHashSet<Tuple> equivClass : equivalenceClasses) {
+            for (Tuple t : equivClass) {
+                Tuple lineageGroup = lineage.getRepresentative(t);
+                Set<Integer> entry = 
lineageGroupToEquivClasses.get(lineageGroup);
+                if (entry == null) {
+                    entry = new HashSet<Integer>();
+                    lineageGroupToEquivClasses.put(lineageGroup, entry);
+                }
+                entry.add(equivClassId);
+            }
+            
+            equivClassId++;
+        }
+        
+        // select lineage groups such that we cover all equivalence classes
+        IdentityHashSet<Tuple> selectedLineageGroups = new 
IdentityHashSet<Tuple>();
+        while (!lineageGroupToEquivClasses.isEmpty()) {
+            // greedily find the lineage group with the best "score", where 
score = # equiv classes covered / group weight
+            double bestScore = -1;
+            Tuple bestLineageGroup = null;
+            Set<Integer> bestEquivClassesCovered = null;
+            for (Tuple lineageGroup : lineageGroupToEquivClasses.keySet()) {
+                double weight = lineageGroupWeights.get(lineageGroup);
+
+                Set<Integer> equivClassesCovered = 
lineageGroupToEquivClasses.get(lineageGroup);
+                int numEquivClassesCovered = equivClassesCovered.size();
+                double score = ((double) numEquivClassesCovered) / 
((double)weight);
+                
+                if (score > bestScore) {
+                       
+                    bestScore = score;
+                    bestLineageGroup = lineageGroup;
+                    bestEquivClassesCovered = equivClassesCovered;
+                }
+            }
+            // add the best-scoring lineage group to the set of ones we plan 
to retain
+            selectedLineageGroups.add(bestLineageGroup);
+
+            // make copy of bestEquivClassesCovered (or else the code that 
follows won't work correctly, because removing from the reference set)
+            Set<Integer> toCopy = bestEquivClassesCovered;
+            bestEquivClassesCovered = new HashSet<Integer>();
+            bestEquivClassesCovered.addAll(toCopy);
+            
+            // remove the classes we've now covered
+            Collection<Tuple> toRemove = new LinkedList<Tuple>();
+            for (Tuple lineageGroup : lineageGroupToEquivClasses.keySet()) {
+                Set<Integer> equivClasses = 
lineageGroupToEquivClasses.get(lineageGroup);
+                equivClasses.removeAll(bestEquivClassesCovered);
+                if (equivClasses.size() == 0) toRemove.add(lineageGroup);
+            }
+            for (Tuple removeMe : toRemove) 
lineageGroupToEquivClasses.remove(removeMe);
+        }
+        
+        // revise baseData to only contain the tuples that are part of 
selectedLineageGroups
+        IdentityHashSet<Tuple> tuplesToRetain = new IdentityHashSet<Tuple>();
+        for (Tuple lineageGroup : selectedLineageGroups) {
+            Collection<Tuple> members = membershipMap.get(lineageGroup);
+            for (Tuple t : members) tuplesToRetain.add(t);
+        }
+        Map<LOLoad, DataBag> newBaseData = new HashMap<LOLoad, DataBag>();
+        for (LOLoad loadOp : baseData.keySet()) {
+            DataBag data = baseData.get(loadOp);
+            //DataBag newData = new DataBag();
+            DataBag newData = BagFactory.getInstance().newDefaultBag();
+            for (Iterator<Tuple> it = data.iterator(); it.hasNext(); ) {
+                Tuple t = it.next();
+                if (tuplesToRetain.contains(t)) newData.add(t);
+            }
+            newBaseData.put(loadOp, newData);
+        }
+        return newBaseData;
+    }
+
+}

Modified: incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=647253&r1=647252&r2=647253&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java 
(original)
+++ incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Fri Apr 
11 11:24:23 2008
@@ -126,6 +126,10 @@
     protected void processDescribe(String alias) throws IOException {
         mPigServer.dumpSchema(alias);
     }
+    
+    protected void processIllustrate(String alias) throws IOException {
+       mPigServer.showExamples(alias);
+    }
 
     protected void processExplain(String alias) throws IOException {
         mPigServer.explain(alias, System.out);

Modified: 
incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=647253&r1=647252&r2=647253&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
 (original)
+++ 
incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
 Fri Apr 11 11:24:23 2008
@@ -45,6 +45,8 @@
        abstract protected void quit();
        
        abstract protected void processDescribe(String alias) throws 
IOException;
+       
+       abstract protected void processIllustrate(String alias) throws 
IOException;
 
     abstract protected void processExplain(String alias) throws IOException;
        
@@ -121,6 +123,7 @@
 TOKEN: {<REGISTER: "register">}
 TOKEN: {<REMOVE: "rm">}
 TOKEN: {<SET: "set">}
+TOKEN: {<ILLUSTRATE: "illustrate">}
 
 // internal use commands
 TOKEN: {<SCRIPT_DONE: "scriptDone">}
@@ -293,6 +296,10 @@
        <DESCRIBE>
        t1 = <IDENTIFIER>
        {processDescribe(t1.image);}
+       |
+       <ILLUSTRATE>
+       t1 = <IDENTIFIER>
+       {processIllustrate(t1.image);}
        |
     <EXPLAIN>
        t1 = <IDENTIFIER>

Added: incubator/pig/trunk/test/org/apache/pig/test/TestExGenCogroup.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestExGenCogroup.java?rev=647253&view=auto
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestExGenCogroup.java (added)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestExGenCogroup.java Fri Apr 
11 11:24:23 2008
@@ -0,0 +1,91 @@
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.Random;
+
+import org.apache.pig.PigServer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class TestExGenCogroup extends TestCase{
+       File A, B;
+       private int MAX = 10;
+       
+       String initString = "local";
+       PigServer pig;
+       
+       @Override
+       @Before
+       protected void setUp() throws Exception{
+               System.out.println("Generating test data...");
+               A = File.createTempFile("dataA", ".dat");
+               B = File.createTempFile("dataB", ".dat");
+               
+               writeData(A);
+               writeData(B);
+               System.out.println("Test data created.");
+               
+       }
+       
+       private void writeData(File dataFile) throws Exception{
+               //File dataFile = File.createTempFile(name, ".dat");
+               FileOutputStream dat = new FileOutputStream(dataFile);
+               
+               Random rand = new Random();
+               
+               for(int i = 0; i < MAX; i++) 
+                       dat.write((rand.nextInt(10) + "\t" + rand.nextInt(10) + 
"\n").getBytes());
+                       
+       }
+       
+       @Override
+       @After
+       protected void tearDown() throws Exception {
+               A.delete();
+               B.delete();
+       
+       }
+       
+       @Test
+       public void testCogroupMultipleCols() throws Exception {
+               pig = new PigServer(initString);
+               pig.registerQuery("A = load '" + A.toString() + "' as (x, y);");
+               pig.registerQuery("B = load '" + B.toString() + "' as (x, y);");
+               pig.registerQuery("C = cogroup A by (x, y), B by (x, y);");
+               pig.showExamples("C");
+       }
+       
+       @Test
+       public void testCogroup() throws Exception {
+               pig = new PigServer(initString);
+               pig.registerQuery("A = load '" + A.toString() + "' as (x, y);");
+               pig.registerQuery("B = load '" + B.toString() + "' as (x, y);");
+               pig.registerQuery("C = cogroup A by x, B by x;");
+               pig.showExamples("C");
+       }
+       
+       @Test
+       public void testGroup() throws Exception {
+               pig = new PigServer(initString);
+               pig.registerQuery("A = load '" + A.toString() + "' as (x, y);");
+               pig.registerQuery("B = group A by x;");
+               pig.showExamples("B");
+               
+       }
+       
+       @Test
+       public void testComplexGroup() throws Exception {
+               pig = new PigServer(initString);
+               pig.registerQuery("A = load '" + A.toString() + "' as (x, y);");
+               pig.registerQuery("B = load '" + B.toString() + "' as (x, y);");
+               pig.registerQuery("C = cogroup A by x, B by x;");
+               pig.registerQuery("D = cogroup A by y, B by y;");
+               pig.registerQuery("E = cogroup C by $0, D by $0;");
+               pig.showExamples("E");
+       }
+       
+}

Added: incubator/pig/trunk/test/org/apache/pig/test/TestExGenEval.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestExGenEval.java?rev=647253&view=auto
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestExGenEval.java (added)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestExGenEval.java Fri Apr 11 
11:24:23 2008
@@ -0,0 +1,91 @@
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.Random;
+
+import org.apache.pig.PigServer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class TestExGenEval extends TestCase {
+       
+       File A, B, C, D;
+       private int MAX = 10;
+       
+       String initString = "local";
+       PigServer pig;
+       
+       @Override
+       @Before
+       protected void setUp() throws Exception{
+               System.out.println("Generating test data...");
+               A = File.createTempFile("dataA", ".dat");
+               B = File.createTempFile("dataB", ".dat");
+               C = File.createTempFile("dataC", ".dat");
+               D = File.createTempFile("dataD", ".dat");
+               
+               writeData(A);
+               writeData(B);
+               writeData(C);
+               writeData(D);
+               System.out.println("Test data created.");
+               
+       }
+       
+       private void writeData(File dataFile) throws Exception{
+               //File dataFile = File.createTempFile(name, ".dat");
+               FileOutputStream dat = new FileOutputStream(dataFile);
+               
+               Random rand = new Random();
+               
+               for(int i = 0; i < MAX; i++) 
+                       dat.write((rand.nextInt(10) + "\t" + rand.nextInt(10) + 
"\n").getBytes());
+                       
+       }
+       
+       @Override
+       @After
+       protected void tearDown() throws Exception {
+               A.delete();
+               B.delete();
+               C.delete();
+               D.delete();
+       }
+       
+       @Test
+       public void testForeach() throws Exception {
+               pig = new PigServer(initString);
+               System.out.println("Testing Foreach statement...");
+               pig.registerQuery("A = load '" + A.toString() + "' as (x, y);");
+               pig.registerQuery("B = foreach A generate x+y as sum;");
+               pig.showExamples("B");
+               assertEquals(1, 1);
+       }
+       
+       @Test 
+       public void testFilter() throws Exception {
+               pig = new PigServer(initString);
+               pig.registerQuery("A = load '" + A.toString() + "' as (x, y);");
+               pig.registerQuery("B = filter A by x < 10.0;");
+               pig.showExamples("B");
+               assertEquals(1, 1);
+       }
+       
+       @Test
+       public void testFlatten() throws Exception {
+               pig = new PigServer(initString);
+               pig.registerQuery("A1 = load '" + A.toString() + "' as (x, 
y);");
+               pig.registerQuery("B1 = load '" + B.toString() + "' as (x, 
y);");
+               pig.registerQuery("C1 = load '" + C.toString() + "' as (x, 
y);");
+               pig.registerQuery("D1 = load '" + D.toString() + "' as (x, 
y);");
+               pig.registerQuery("E = join A1 by x, B1 by x;");
+               pig.registerQuery("F = join C1 by x, D1 by x;");
+               pig.registerQuery("G = join E by $0, F by $0;");
+               pig.showExamples("G");
+               assertEquals(1, 1);
+       }
+}


Reply via email to