Author: pisong Date: Mon Jul 21 04:30:57 2008 New Revision: 678392 URL: http://svn.apache.org/viewvc?rev=678392&view=rev Log: PIG-290 Fixed LOCross schema problem
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=678392&r1=678391&r2=678392&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java Mon Jul 21 04:30:57 2008 @@ -93,7 +93,7 @@ @Override public Schema getSchema() throws FrontendException { - log.trace("Entering getSchema"); + log.debug("Entering getSchema"); if (!mIsSchemaComputed) { List<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>( mForEachPlans.size()); @@ -228,7 +228,7 @@ } } errMessage += ". Please alias the columns with unique names."; - log.info(errMessage); + log.debug(errMessage); throw new FrontendException(errMessage); } mSchema = new Schema(fss); @@ -244,7 +244,7 @@ } mIsSchemaComputed = true; } - log.trace("Exiting getSchema"); + log.debug("Exiting getSchema"); return mSchema; } } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=678392&r1=678391&r2=678392&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java Mon Jul 21 04:30:57 2008 @@ -249,8 +249,6 @@ private void checkInPlan(E op) throws PlanException { if (mOps.get(op) == null) { - log.debug("Attempt to connect operator " + - op.name() + " which is not in the plan."); PlanException pe = new PlanException("Attempt to connect operator " + op.name() + " which is not in the plan."); log.error(pe.getMessage()); @@ -368,14 +366,32 @@ E after, E newNode, E before) throws PlanException { - if (!disconnect(after, before)) { + checkInPlan(newNode); + if (!replaceNode(after, newNode, before, mFromEdges) || !replaceNode(before, newNode, after, mToEdges)) { PlanException pe = new PlanException("Attempt to insert between two nodes " + "that were not connected."); log.error(pe.getMessage()); throw pe; } - connect(after, newNode); - connect(newNode, before); + mFromEdges.put(newNode, before); + mToEdges.put(newNode, after); + } + + private boolean replaceNode(E src, E replacement, E dst, MultiMap<E, E> multiMap) { + Collection c = multiMap.get(src); + if (c == null) return false; + + ArrayList al = new ArrayList(c); + for(int i = 0; i < al.size(); ++i) { + E to = (E)al.get(i); + if(to.equals(dst)) { + al.set(i, replacement); + multiMap.removeKey(src); + multiMap.put(src, al); + return true; + } + } + return false; } /**