Added: incubator/pig/trunk/src/org/apache/pig/pen/ExGen.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/pen/ExGen.java?rev=647253&view=auto ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/pen/ExGen.java (added) +++ incubator/pig/trunk/src/org/apache/pig/pen/ExGen.java Fri Apr 11 11:24:23 2008 @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.pen; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; + +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.executionengine.ExecPhysicalOperator; +import org.apache.pig.backend.executionengine.ExecPhysicalPlan; +import org.apache.pig.backend.local.executionengine.LocalExecutionEngine; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.ExampleTuple; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.logicalLayer.LOLoad; +import org.apache.pig.impl.logicalLayer.LogicalOperator; +import org.apache.pig.impl.logicalLayer.LogicalPlan; +import org.apache.pig.impl.logicalLayer.OperatorKey; +import org.apache.pig.impl.util.IdentityHashSet; +import org.apache.pig.impl.util.LineageTracer; + +public class ExGen { + + final static int SAMPLE_SIZE = 10000; // size of sample used in initial downstream pass + + static Map<LOLoad, DataBag> GlobalBaseData = new HashMap<LOLoad, DataBag>(); + + public static Map<LogicalOperator, DataBag> GenerateExamples(LogicalPlan plan, PigContext pigContext) throws IOException { + + long time = System.currentTimeMillis(); + String Result; + + //compile the logical plan to get the physical plan once and for all + ExecPhysicalPlan PhyPlan = null; + try { + PhyPlan = pigContext.getExecutionEngine().compile(plan, null); + } catch (ExecException e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } + Map<OperatorKey, OperatorKey> logicalToPhysicalKeys = ((LocalExecutionEngine)pigContext.getExecutionEngine()).getLogicalToPhysicalMap(); + Map<OperatorKey, ExecPhysicalOperator> physicalOpTable = PhyPlan.getOpTable(); + + // Acquire initial base data by sampling from input relations (this is idempotent) + FetchBaseData.ReadBaseData(plan.getRootOperator(), GlobalBaseData, SAMPLE_SIZE, pigContext); + + /////// PASS 1: push data sample through query plan + + // Push base data through query plan + Map<LogicalOperator, DataBag> derivedData; + Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> OperatorToEqClasses = new HashMap<LogicalOperator, Collection<IdentityHashSet<Tuple>>>(); + Collection<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>(); + derivedData = DerivedData.CreateDerivedData(plan.getRootOperator(), GlobalBaseData, logicalToPhysicalKeys, physicalOpTable); +// derivedData = DerivedData.CreateDerivedData(plan.getRootOperator(), GlobalBaseData, logicalToPhysicalKeys, physicalOpTable); + + /////// PASS 2: augment data sample to reduce sparsity + + // Use constraint back-prop to synthesize additional base data, in order to reduce sparsity + // (and keep track of which tuples are synthetic) + IdentityHashSet<Tuple> syntheticTuples = new IdentityHashSet<Tuple>(); + Map<LOLoad, DataBag> modifiedBaseData = AugmentData.AugmentBaseData(plan.getRootOperator(), GlobalBaseData, syntheticTuples, derivedData, pigContext); + + { + LineageTracer lineage = new LineageTracer(); + derivedData = DerivedData.CreateDerivedData(plan.getRootOperator(), modifiedBaseData, lineage, equivalenceClasses, OperatorToEqClasses, logicalToPhysicalKeys, physicalOpTable); + modifiedBaseData = ShapeLineage.PruneBaseData(modifiedBaseData, derivedData.get(plan.getRoot()), syntheticTuples, lineage, equivalenceClasses); + + } + + { + LineageTracer lineage = new LineageTracer(); + derivedData = DerivedData.CreateDerivedData(plan.getRootOperator(), modifiedBaseData, lineage, equivalenceClasses, OperatorToEqClasses, logicalToPhysicalKeys, physicalOpTable); + modifiedBaseData = ShapeLineage.TrimLineages(plan.getRootOperator(), modifiedBaseData, derivedData, lineage, OperatorToEqClasses, logicalToPhysicalKeys, physicalOpTable); + } + + { + LineageTracer lineage = new LineageTracer(); + derivedData = DerivedData.CreateDerivedData(plan.getRootOperator(), modifiedBaseData, lineage, null, null, logicalToPhysicalKeys, physicalOpTable); + + } + + // Push finalized base data through query plan, to generate final answer + // also mark the tuples being displayed non-omittable + + //derivedData = CreateDerivedData(plan.getRootOperator(), modifiedBaseData, logicalToPhysicalKeys, physicalOpTable); + /*for(Map.Entry<LogicalOperator, DataBag> e : derivedData.entrySet()) { + DataBag bag = e.getValue(); + for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) { + ExampleTuple t = (ExampleTuple) it.next(); + t.omittable = false; + + } + }*/ + + /*Result = PrintExamples(plan, derivedData, OperatorToEqClasses); + System.out.println("Time taken : " + (System.currentTimeMillis() - time) + "ms"); + return Result;*/ + return derivedData; + } + +}
Added: incubator/pig/trunk/src/org/apache/pig/pen/FetchBaseData.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/pen/FetchBaseData.java?rev=647253&view=auto ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/pen/FetchBaseData.java (added) +++ incubator/pig/trunk/src/org/apache/pig/pen/FetchBaseData.java Fri Apr 11 11:24:23 2008 @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.pen; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.pig.backend.executionengine.ExecPhysicalOperator; +import org.apache.pig.backend.local.executionengine.POLoad; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.io.FileSpec; +import org.apache.pig.impl.logicalLayer.LOLoad; +import org.apache.pig.impl.logicalLayer.LogicalOperator; +import org.apache.pig.impl.logicalLayer.OperatorKey; +import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator; +import org.apache.pig.impl.util.LineageTracer; + +public class FetchBaseData { + static void ReadBaseData(LogicalOperator op, Map<LOLoad, DataBag> baseData, int sampleSize, PigContext pigContext) throws IOException { + + if (baseData == null) { + throw new IOException("BaseData is null!"); + } + + DataBag test = baseData.get(op); + if(test != null) { + //The data exists locally and need not be fetched again + return; + } + + if (op instanceof LOLoad) { + FileSpec fileSpec = ((LOLoad)op).getInputFileSpec(); + + if(op.outputSchema().fields.isEmpty()) { + throw new IOException("Illustrate command needs a user defined schema to function. Please specify a schema while loading the data."); + } + + DataBag opBaseData = BagFactory.getInstance().newDefaultBag(); + //POLoad poLoad = new POLoad(pigContext, ((LOLoad) op).getInputFileSpec(), op.getOutputType()); + POLoad poLoad = new POLoad(op.getScope(), + NodeIdGenerator.getGenerator().getNextNodeId(op.getOperatorKey().getScope()), + new HashMap<OperatorKey, ExecPhysicalOperator> (), + pigContext, + fileSpec, + LogicalOperator.FIXED + ); + poLoad.setLineageTracer(new LineageTracer()); + poLoad.open(); + for (int i = 0; i < sampleSize; i++) { + Tuple t = poLoad.getNext(); + + if (t == null) break; + opBaseData.add(t); + } + poLoad.close(); + + baseData.put((LOLoad) op, opBaseData); + } else { + /*for (Iterator<LogicalOperator> it = op.getInputs().iterator(); it.hasNext(); ) { + ReadBaseData(it.next(), baseData, sampleSize, pigContext); + }*/ + for(OperatorKey opKey : op.getInputs()) { + ReadBaseData(op.getOpTable().get(opKey), baseData, sampleSize, pigContext); + } + } + } +} Added: incubator/pig/trunk/src/org/apache/pig/pen/Metrics.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/pen/Metrics.java?rev=647253&view=auto ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/pen/Metrics.java (added) +++ incubator/pig/trunk/src/org/apache/pig/pen/Metrics.java Fri Apr 11 11:24:23 2008 @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.pen; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import org.apache.pig.data.DataBag; +import org.apache.pig.data.ExampleTuple; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.eval.FilterSpec; +import org.apache.pig.impl.logicalLayer.LOEval; +import org.apache.pig.impl.logicalLayer.LogicalOperator; +import org.apache.pig.impl.util.IdentityHashSet; + +public class Metrics { + static float getRealness(LogicalOperator op, Map<LogicalOperator, DataBag> exampleData, boolean overallRealness) { + //StringBuffer str = new StringBuffer(); + int noTuples = 0; + int noSynthetic = 0; + for(Map.Entry<LogicalOperator, DataBag> e : exampleData.entrySet()) { + //if(e.getKey() instanceof LORead) continue; + DataBag bag; + if(overallRealness) { + bag = exampleData.get(e.getKey()); + } else { + bag = exampleData.get(op); + } + noTuples += bag.cardinality(); + for(Iterator<Tuple> it = bag.iterator(); it.hasNext();) { + if(((ExampleTuple)it.next()).isSynthetic()) noSynthetic++; + } + if(!overallRealness) break; + + } + + if(overallRealness) { + return 100*(1 - ((float)noSynthetic / (float)noTuples)); + } else { + return 100*(1 - ((float)noSynthetic / (float)noTuples)); + } + + } + + static float getConciseness(LogicalOperator op, Map<LogicalOperator, DataBag> exampleData, Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> OperatorToEqClasses, boolean overallConciseness) { + DataBag bag = exampleData.get(op); + + int noEqCl = OperatorToEqClasses.get(op).size(); + int noTuples = bag.cardinality(); + + + float conciseness = 100*((float)noEqCl / (float)noTuples); + if(!overallConciseness) { + + return ((conciseness > 100.0) ? 100.0f : conciseness); + } else { + + + noEqCl = 0; + noTuples = 0; + conciseness = 0; + int noOperators = 0; + + for(Map.Entry<LogicalOperator, Collection<IdentityHashSet<Tuple>>> e : OperatorToEqClasses.entrySet()) { + //if(e.getKey() instanceof LORead) continue; + noOperators++; //we need to keep a track of these and not use OperatorToEqClasses.size() as LORead shouldn't be considered a operator + bag = exampleData.get(e.getKey()); + + noTuples = bag.cardinality(); + noEqCl = e.getValue().size(); + float concise = 100*((float)noEqCl / (float)noTuples); + concise = (concise > 100) ? 100 : concise; + conciseness += concise; + } + conciseness /= (float)noOperators; + + return conciseness; + } + + } + + + static float getCompleteness(LogicalOperator op, Map<LogicalOperator, DataBag> exampleData, Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> OperatorToEqClasses, boolean overallCompleteness) { + + int noClasses = 0; + int noCoveredClasses = 0; + int noOperators = 0; + Map<Integer, Boolean> coveredClasses; + float completeness = 0; + if(!overallCompleteness) { + Collection<IdentityHashSet<Tuple>> eqClasses = OperatorToEqClasses.get(op); + DataBag bag; + if(op instanceof LOEval && ((LOEval)op).getSpec() instanceof FilterSpec) { + //bag = exampleData.get(op.getInputs().get(0)); + bag = exampleData.get(op.getOpTable().get(op.getInputs().get(0))); + } else { + bag = exampleData.get(op); + } + coveredClasses = getCompletenessLogic(bag, eqClasses); + noClasses = eqClasses.size(); + for(Map.Entry<Integer, Boolean> e : coveredClasses.entrySet() ) { + if(e.getValue()) { + noCoveredClasses ++; + } + } + + + return 100*((float)noCoveredClasses)/(float)noClasses; + } else { + for(Map.Entry<LogicalOperator, Collection<IdentityHashSet<Tuple>>> e : OperatorToEqClasses.entrySet()) { + noCoveredClasses = 0; + noClasses = 0; + + //if(e.getKey() instanceof LORead) continue; //We don't consider LORead a operator. + + noOperators++; + Collection<IdentityHashSet<Tuple>> eqClasses = e.getValue(); + LogicalOperator lop = e.getKey(); + DataBag bag; + if(lop instanceof LOEval && ((LOEval)lop).getSpec() instanceof FilterSpec) { + //bag = exampleData.get(lop.getInputs().get(0)); + bag = exampleData.get(lop.getOpTable().get(lop.getInputs().get(0))); + } else { + bag = exampleData.get(lop); + } + coveredClasses = getCompletenessLogic(bag, eqClasses); + noClasses += eqClasses.size(); + for(Map.Entry<Integer, Boolean> e_result : coveredClasses.entrySet() ) { + if(e_result.getValue()) { + noCoveredClasses ++; + } + } + completeness += 100*((float)noCoveredClasses/(float)noClasses); + } + completeness /= (float)noOperators; + + return completeness; + } + + + } + + static Map<Integer, Boolean> getCompletenessLogic(DataBag bag, Collection<IdentityHashSet<Tuple>> eqClasses) { + Map<Integer, Boolean> coveredClasses = new HashMap<Integer, Boolean> (); + + for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) { + Tuple t = it.next(); + int classId = 0; + for(IdentityHashSet<Tuple> eqClass : eqClasses) { + + if(eqClass.contains(t)) { + coveredClasses.put(classId, true); + } + classId ++; + } + } + + + return coveredClasses; + + } +} Added: incubator/pig/trunk/src/org/apache/pig/pen/ShapeLineage.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/pen/ShapeLineage.java?rev=647253&view=auto ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/pen/ShapeLineage.java (added) +++ incubator/pig/trunk/src/org/apache/pig/pen/ShapeLineage.java Fri Apr 11 11:24:23 2008 @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.pen; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.pig.backend.executionengine.ExecPhysicalOperator; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.eval.EvalSpec; +import org.apache.pig.impl.eval.FilterSpec; +import org.apache.pig.impl.logicalLayer.LOCogroup; +import org.apache.pig.impl.logicalLayer.LOEval; +import org.apache.pig.impl.logicalLayer.LOLoad; +import org.apache.pig.impl.logicalLayer.LogicalOperator; +import org.apache.pig.impl.logicalLayer.OperatorKey; +import org.apache.pig.impl.util.IdentityHashSet; +import org.apache.pig.impl.util.LineageTracer; + + +public class ShapeLineage { + static Map<LOLoad, DataBag> TrimLineages(LogicalOperator root, Map<LOLoad, DataBag> baseData, Map<LogicalOperator, DataBag> derivedData, LineageTracer lineage, Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> OperatorToEqClasses, Map<OperatorKey, OperatorKey> logicalToPhysicalKeys, Map<OperatorKey, ExecPhysicalOperator> physicalOpTable) { + Map <LOLoad, DataBag> modifiedBaseData = new HashMap<LOLoad, DataBag>(); + //modifiedBaseData.putAll(baseData); + + List<IdentityHashSet<Tuple>> affinityGroups = new LinkedList<IdentityHashSet<Tuple>>(); + IdentityHashSet<Tuple> a1 = new IdentityHashSet<Tuple>(); + affinityGroups.add(a1); + IdentityHashSet<Tuple> a2 = new IdentityHashSet<Tuple>(); + affinityGroups.add(a2); + TrimLineages(root, root, modifiedBaseData, derivedData, affinityGroups, lineage, OperatorToEqClasses, 0.0, logicalToPhysicalKeys, physicalOpTable); + + return modifiedBaseData; + + } + + static void TrimLineages(LogicalOperator root, LogicalOperator currentOp, Map<LOLoad, DataBag> baseData, Map<LogicalOperator, DataBag> derivedData, List<IdentityHashSet<Tuple>> affinityGroups, LineageTracer lineage, Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> OperatorToEqClasses, double completeness, Map<OperatorKey, OperatorKey> logicalToPhysicalKeys, Map<OperatorKey, ExecPhysicalOperator> physicalOpTable) { + //With lineage added in the lineageTracer to track the parent-child relationship, only filter needs a consideration since we need a record that doesn't satisfy the filter condition + IdentityHashSet<Tuple> affinityGroup = affinityGroups.get(0); + if(affinityGroup.size() == 0) { + //first operator/root in the logical plan + DataBag bag = derivedData.get(currentOp); + for(Iterator<Tuple> it = bag.iterator(); it.hasNext();) { + Tuple t = it.next(); + if(lineage.flattenLineageContains(t)) { + affinityGroup.add(t); + break; + } + affinityGroup.add(t); + } + } + + + //if any member of the affinity group is present in the flattenLineage, that needs to be handled specially + IdentityHashSet<Tuple> flattenElements = affinityGroups.get(1); + + IdentityHashSet<Tuple> newAffinityGroup = new IdentityHashSet<Tuple>(); + if(currentOp instanceof LOEval) { + EvalSpec spec = ((LOEval)currentOp).getSpec(); + if(spec instanceof FilterSpec) { + //here we need to put a Tuple that doesn't pass the filter into the affinity groups. + LogicalOperator input = currentOp.getOpTable().get(currentOp.getInputs().get(0)); //since its a filter, we know there is only 1 input. + DataBag inputData = derivedData.get(input); + for(Iterator<Tuple> it = inputData.iterator(); it.hasNext();) { + Tuple t = it.next(); + if(!((FilterSpec)spec).cond.eval(t)) { + newAffinityGroup.add(t); + break; + } + } + } + for(Tuple t : affinityGroup) { + if(lineage.flattenLineageContains(t)) { + flattenElements.addAll(lineage.getFlattenChildren(t)); + } + newAffinityGroup.addAll(lineage.getChildren(t)); + } + //Affinity group for eval ready, calling recursion + affinityGroup.clear(); + affinityGroup.addAll(newAffinityGroup); + TrimLineages(root, currentOp.getOpTable().get(currentOp.getInputs().get(0)), baseData, derivedData, affinityGroups, lineage, OperatorToEqClasses, completeness, logicalToPhysicalKeys, physicalOpTable); + } + + if(currentOp instanceof LOCogroup) { + + List<OperatorKey> inputs = currentOp.getInputs(); + int numInputs = inputs.size(); + IdentityHashSet<Tuple> flattenAffinityGroup = affinityGroups.get(1); + + + if(numInputs == 1) { + //this is a group case + double score = 0; + double bestScore = -1; + int best = 0; + double nextBestScore = -1; + int nextBest = 0; + for(Tuple t : affinityGroup) { + int index = 0; + List<Tuple> children = lineage.getChildren(t); + if(children.size() == 1) { + Tuple test = children.get(0); + if(!newAffinityGroup.contains(test)) + newAffinityGroup.add(test); + } else { + if(flattenAffinityGroup.size() > 0) { + newAffinityGroup.addAll(flattenAffinityGroup); + flattenAffinityGroup.clear(); + } else { + for(Tuple child : children) { + score = 1 / lineage.getWeightedCounts(child, 2, 1); + if(score > bestScore) { + nextBest = best; + best = index; + nextBestScore = bestScore; + bestScore = score; + } else if(score > nextBestScore) { + nextBest = index; + nextBestScore = score; + } + index++; + } + + newAffinityGroup.add(children.get(best)); + newAffinityGroup.add(children.get(nextBest)); + } + } + } + affinityGroup.clear(); + affinityGroup.addAll(newAffinityGroup); + TrimLineages(root, currentOp.getOpTable().get(inputs.get(0)), baseData, derivedData, affinityGroups, lineage, OperatorToEqClasses, completeness, logicalToPhysicalKeys, physicalOpTable); + } else { + //This is a cogroup case + newAffinityGroup.addAll(affinityGroup); + for(int i = 0; i < numInputs; i++) { + IdentityHashSet<Tuple> cogroupAffinityGroup = new IdentityHashSet<Tuple>(); + for(Tuple t : newAffinityGroup) { + DataBag data = t.getBagField(i+1); + //Ideally we should have no field with null values because of synthetic data generation. + if(data.size() == 0) + continue; + + if(flattenAffinityGroup.size() > 0) { + for(Iterator<Tuple> it = data.iterator(); it.hasNext(); ) { + Tuple child = it.next(); + if(flattenAffinityGroup.contains(child)) { + cogroupAffinityGroup.add(child); + flattenAffinityGroup.remove(child); + } + } + } else { + //The children are all the tuples present in data + Tuple best = null; + double bestScore = -1; + for(Iterator<Tuple> it = data.iterator(); it.hasNext(); ) { + Tuple child = it.next(); + + double score = 1 / lineage.getWeightedCounts(child, 2, 1); + if(score > bestScore) { + best = child; + bestScore = score; + } + } + cogroupAffinityGroup.add(best); + } + } + affinityGroup.clear(); + affinityGroup.addAll(cogroupAffinityGroup); + TrimLineages(root, currentOp.getOpTable().get(inputs.get(i)), baseData, derivedData, affinityGroups, lineage, OperatorToEqClasses, completeness, logicalToPhysicalKeys, physicalOpTable); + } + } + } + + + if(currentOp instanceof LOLoad) { + + getBaseData(currentOp, affinityGroup, lineage, baseData, derivedData); + } + + return; + + } + + + + static void getBaseData(LogicalOperator lOp, IdentityHashSet<Tuple> affinityGroup, LineageTracer lineage, Map<LOLoad, DataBag> baseData, Map<LogicalOperator, DataBag> derivedData) { + DataBag bag = baseData.get(lOp); + IdentityHashSet<Tuple> temp = new IdentityHashSet<Tuple>(); + if(bag == null) { + bag = BagFactory.getInstance().newDefaultBag(); + baseData.put((LOLoad) lOp, bag); + } else { + //Now we try to ensure that the same tuple is not added twice + //In effect we are trying to get the union of multiple updates happening to the baseData + for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) { + temp.add(it.next()); + } + bag.clear(); + } + + for(Tuple t : affinityGroup) { + temp.add(t); + } + for(Tuple t : temp) { + bag.add(t); + } + return; + } + + static Map<LOLoad, DataBag> PruneBaseData(Map<LOLoad, DataBag> baseData, DataBag rootOutput, IdentityHashSet<Tuple> syntheticTuples, LineageTracer lineage, Collection<IdentityHashSet<Tuple>> equivalenceClasses) { + + IdentityHashMap<Tuple, Collection<Tuple>> membershipMap = lineage.getMembershipMap(); + IdentityHashMap<Tuple, Double> lineageGroupWeights = lineage.getWeightedCounts(2f, 1); + + // compute a mapping from lineage group to the set of equivalence classes covered by it + IdentityHashMap<Tuple, Set<Integer>> lineageGroupToEquivClasses = new IdentityHashMap<Tuple, Set<Integer>>(); + int equivClassId = 0; + for (IdentityHashSet<Tuple> equivClass : equivalenceClasses) { + for (Tuple t : equivClass) { + Tuple lineageGroup = lineage.getRepresentative(t); + Set<Integer> entry = lineageGroupToEquivClasses.get(lineageGroup); + if (entry == null) { + entry = new HashSet<Integer>(); + lineageGroupToEquivClasses.put(lineageGroup, entry); + } + entry.add(equivClassId); + } + + equivClassId++; + } + + // select lineage groups such that we cover all equivalence classes + IdentityHashSet<Tuple> selectedLineageGroups = new IdentityHashSet<Tuple>(); + while (!lineageGroupToEquivClasses.isEmpty()) { + // greedily find the lineage group with the best "score", where score = # equiv classes covered / group weight + double bestScore = -1; + Tuple bestLineageGroup = null; + Set<Integer> bestEquivClassesCovered = null; + for (Tuple lineageGroup : lineageGroupToEquivClasses.keySet()) { + double weight = lineageGroupWeights.get(lineageGroup); + + Set<Integer> equivClassesCovered = lineageGroupToEquivClasses.get(lineageGroup); + int numEquivClassesCovered = equivClassesCovered.size(); + double score = ((double) numEquivClassesCovered) / ((double)weight); + + if (score > bestScore) { + + bestScore = score; + bestLineageGroup = lineageGroup; + bestEquivClassesCovered = equivClassesCovered; + } + } + // add the best-scoring lineage group to the set of ones we plan to retain + selectedLineageGroups.add(bestLineageGroup); + + // make copy of bestEquivClassesCovered (or else the code that follows won't work correctly, because removing from the reference set) + Set<Integer> toCopy = bestEquivClassesCovered; + bestEquivClassesCovered = new HashSet<Integer>(); + bestEquivClassesCovered.addAll(toCopy); + + // remove the classes we've now covered + Collection<Tuple> toRemove = new LinkedList<Tuple>(); + for (Tuple lineageGroup : lineageGroupToEquivClasses.keySet()) { + Set<Integer> equivClasses = lineageGroupToEquivClasses.get(lineageGroup); + equivClasses.removeAll(bestEquivClassesCovered); + if (equivClasses.size() == 0) toRemove.add(lineageGroup); + } + for (Tuple removeMe : toRemove) lineageGroupToEquivClasses.remove(removeMe); + } + + // revise baseData to only contain the tuples that are part of selectedLineageGroups + IdentityHashSet<Tuple> tuplesToRetain = new IdentityHashSet<Tuple>(); + for (Tuple lineageGroup : selectedLineageGroups) { + Collection<Tuple> members = membershipMap.get(lineageGroup); + for (Tuple t : members) tuplesToRetain.add(t); + } + Map<LOLoad, DataBag> newBaseData = new HashMap<LOLoad, DataBag>(); + for (LOLoad loadOp : baseData.keySet()) { + DataBag data = baseData.get(loadOp); + //DataBag newData = new DataBag(); + DataBag newData = BagFactory.getInstance().newDefaultBag(); + for (Iterator<Tuple> it = data.iterator(); it.hasNext(); ) { + Tuple t = it.next(); + if (tuplesToRetain.contains(t)) newData.add(t); + } + newBaseData.put(loadOp, newData); + } + return newBaseData; + } + +} Modified: incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=647253&r1=647252&r2=647253&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original) +++ incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Fri Apr 11 11:24:23 2008 @@ -126,6 +126,10 @@ protected void processDescribe(String alias) throws IOException { mPigServer.dumpSchema(alias); } + + protected void processIllustrate(String alias) throws IOException { + mPigServer.showExamples(alias); + } protected void processExplain(String alias) throws IOException { mPigServer.explain(alias, System.out); Modified: incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=647253&r1=647252&r2=647253&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj (original) +++ incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj Fri Apr 11 11:24:23 2008 @@ -45,6 +45,8 @@ abstract protected void quit(); abstract protected void processDescribe(String alias) throws IOException; + + abstract protected void processIllustrate(String alias) throws IOException; abstract protected void processExplain(String alias) throws IOException; @@ -121,6 +123,7 @@ TOKEN: {<REGISTER: "register">} TOKEN: {<REMOVE: "rm">} TOKEN: {<SET: "set">} +TOKEN: {<ILLUSTRATE: "illustrate">} // internal use commands TOKEN: {<SCRIPT_DONE: "scriptDone">} @@ -293,6 +296,10 @@ <DESCRIBE> t1 = <IDENTIFIER> {processDescribe(t1.image);} + | + <ILLUSTRATE> + t1 = <IDENTIFIER> + {processIllustrate(t1.image);} | <EXPLAIN> t1 = <IDENTIFIER> Added: incubator/pig/trunk/test/org/apache/pig/test/TestExGenCogroup.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestExGenCogroup.java?rev=647253&view=auto ============================================================================== --- incubator/pig/trunk/test/org/apache/pig/test/TestExGenCogroup.java (added) +++ incubator/pig/trunk/test/org/apache/pig/test/TestExGenCogroup.java Fri Apr 11 11:24:23 2008 @@ -0,0 +1,91 @@ +package org.apache.pig.test; + +import java.io.File; +import java.io.FileOutputStream; +import java.util.Random; + +import org.apache.pig.PigServer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import junit.framework.TestCase; + +public class TestExGenCogroup extends TestCase{ + File A, B; + private int MAX = 10; + + String initString = "local"; + PigServer pig; + + @Override + @Before + protected void setUp() throws Exception{ + System.out.println("Generating test data..."); + A = File.createTempFile("dataA", ".dat"); + B = File.createTempFile("dataB", ".dat"); + + writeData(A); + writeData(B); + System.out.println("Test data created."); + + } + + private void writeData(File dataFile) throws Exception{ + //File dataFile = File.createTempFile(name, ".dat"); + FileOutputStream dat = new FileOutputStream(dataFile); + + Random rand = new Random(); + + for(int i = 0; i < MAX; i++) + dat.write((rand.nextInt(10) + "\t" + rand.nextInt(10) + "\n").getBytes()); + + } + + @Override + @After + protected void tearDown() throws Exception { + A.delete(); + B.delete(); + + } + + @Test + public void testCogroupMultipleCols() throws Exception { + pig = new PigServer(initString); + pig.registerQuery("A = load '" + A.toString() + "' as (x, y);"); + pig.registerQuery("B = load '" + B.toString() + "' as (x, y);"); + pig.registerQuery("C = cogroup A by (x, y), B by (x, y);"); + pig.showExamples("C"); + } + + @Test + public void testCogroup() throws Exception { + pig = new PigServer(initString); + pig.registerQuery("A = load '" + A.toString() + "' as (x, y);"); + pig.registerQuery("B = load '" + B.toString() + "' as (x, y);"); + pig.registerQuery("C = cogroup A by x, B by x;"); + pig.showExamples("C"); + } + + @Test + public void testGroup() throws Exception { + pig = new PigServer(initString); + pig.registerQuery("A = load '" + A.toString() + "' as (x, y);"); + pig.registerQuery("B = group A by x;"); + pig.showExamples("B"); + + } + + @Test + public void testComplexGroup() throws Exception { + pig = new PigServer(initString); + pig.registerQuery("A = load '" + A.toString() + "' as (x, y);"); + pig.registerQuery("B = load '" + B.toString() + "' as (x, y);"); + pig.registerQuery("C = cogroup A by x, B by x;"); + pig.registerQuery("D = cogroup A by y, B by y;"); + pig.registerQuery("E = cogroup C by $0, D by $0;"); + pig.showExamples("E"); + } + +} Added: incubator/pig/trunk/test/org/apache/pig/test/TestExGenEval.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestExGenEval.java?rev=647253&view=auto ============================================================================== --- incubator/pig/trunk/test/org/apache/pig/test/TestExGenEval.java (added) +++ incubator/pig/trunk/test/org/apache/pig/test/TestExGenEval.java Fri Apr 11 11:24:23 2008 @@ -0,0 +1,91 @@ +package org.apache.pig.test; + +import java.io.File; +import java.io.FileOutputStream; +import java.util.Random; + +import org.apache.pig.PigServer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import junit.framework.TestCase; + +public class TestExGenEval extends TestCase { + + File A, B, C, D; + private int MAX = 10; + + String initString = "local"; + PigServer pig; + + @Override + @Before + protected void setUp() throws Exception{ + System.out.println("Generating test data..."); + A = File.createTempFile("dataA", ".dat"); + B = File.createTempFile("dataB", ".dat"); + C = File.createTempFile("dataC", ".dat"); + D = File.createTempFile("dataD", ".dat"); + + writeData(A); + writeData(B); + writeData(C); + writeData(D); + System.out.println("Test data created."); + + } + + private void writeData(File dataFile) throws Exception{ + //File dataFile = File.createTempFile(name, ".dat"); + FileOutputStream dat = new FileOutputStream(dataFile); + + Random rand = new Random(); + + for(int i = 0; i < MAX; i++) + dat.write((rand.nextInt(10) + "\t" + rand.nextInt(10) + "\n").getBytes()); + + } + + @Override + @After + protected void tearDown() throws Exception { + A.delete(); + B.delete(); + C.delete(); + D.delete(); + } + + @Test + public void testForeach() throws Exception { + pig = new PigServer(initString); + System.out.println("Testing Foreach statement..."); + pig.registerQuery("A = load '" + A.toString() + "' as (x, y);"); + pig.registerQuery("B = foreach A generate x+y as sum;"); + pig.showExamples("B"); + assertEquals(1, 1); + } + + @Test + public void testFilter() throws Exception { + pig = new PigServer(initString); + pig.registerQuery("A = load '" + A.toString() + "' as (x, y);"); + pig.registerQuery("B = filter A by x < 10.0;"); + pig.showExamples("B"); + assertEquals(1, 1); + } + + @Test + public void testFlatten() throws Exception { + pig = new PigServer(initString); + pig.registerQuery("A1 = load '" + A.toString() + "' as (x, y);"); + pig.registerQuery("B1 = load '" + B.toString() + "' as (x, y);"); + pig.registerQuery("C1 = load '" + C.toString() + "' as (x, y);"); + pig.registerQuery("D1 = load '" + D.toString() + "' as (x, y);"); + pig.registerQuery("E = join A1 by x, B1 by x;"); + pig.registerQuery("F = join C1 by x, D1 by x;"); + pig.registerQuery("G = join E by $0, F by $0;"); + pig.showExamples("G"); + assertEquals(1, 1); + } +}