Thanks Jonathan, your code looks fine. I think there is a bug in the serialization code for bags.
>From a quick look, the size of the bag is a long but it is read as an int when comparing bags. Not sure this is the culprit though. I think you should open a Jira for this. -- Gianmarco De Francisci Morales On Tue, Jan 25, 2011 at 21:47, Jonathan Coveney <[email protected]> wrote: > package squeal.fun; > import java.util.Iterator; > import java.util.List; > import java.util.ArrayList; > import java.util.Map; > import java.util.HashMap; > import java.util.Set; > import java.util.HashSet; > import java.io.IOException; > import org.apache.pig.PigException; > import org.apache.pig.backend.executionengine.ExecException; > import org.apache.pig.EvalFunc; > import org.apache.pig.data.Tuple; > import org.apache.pig.data.DataBag; > import org.apache.pig.data.BagFactory; > import org.apache.pig.data.TupleFactory; > import org.apache.pig.impl.util.WrappedIOException; > import org.apache.pig.impl.logicalLayer.schema.Schema; > import org.apache.pig.data.DataType; > import squeal.com.MutableInt; > > public class numgraph extends EvalFunc<DataBag>{ > TupleFactory mTupleFactory = TupleFactory.getInstance(); > BagFactory mBagFactory = BagFactory.getInstance(); > > public DataBag exec(Tuple input) throws IOException { > try { > accumulate(input); > DataBag bag = getValue(); > System.out.println(input.get(0).toString()); > System.out.println(bag.toString()); > return bag; > > } catch (Exception e) { > int errCode = 31415; > String msg = "Error while accumulating graphs (exec) " + > this.getClass().getSimpleName(); > throw new ExecException(msg, errCode, PigException.BUG, e); > } > } > > public void accumulate(Tuple input) throws IOException { > try { > buildgraph(input); > } catch (Exception e) { > int errCode = 31415; > String msg = "Error while accumulating graphs (accumulate) " + > this.getClass().getSimpleName(); > throw new ExecException(msg, errCode, PigException.BUG, e); > } > } > > //public void cleanup() { thegraph.clear(); } > > public DataBag getValue() throws IOException { > try { > return thegraph.toBag(); > } catch (Exception e) { > int errCode = 31415; > String msg = "Error while accumulating graphs (getValue) " + > this.getClass().getSimpleName(); > throw new ExecException(msg, errCode, PigException.BUG, e); > } > } > > Graph thegraph = null; > private class Graph { > Map<numpair, MutableInt> graph; > > Graph() { graph = null; } > Graph(Map<numpair,MutableInt> gs) { graph = gs; } > > Map<numpair,MutableInt> getGraph() { return graph; } > void setGraph(Map<numpair,MutableInt> gs) { graph = gs; } > void inc(numpair look) { > MutableInt val = graph.get(look); > if (val == null) { > val = new MutableInt(); > graph.put(look,val); > } else { > val.inc(); > } > } > > void clear() { graph = null; } > > @Override > public String toString() { return graph.toString(); } > > void addPairsBag(DataBag c2s) throws IOException { > try { > List<String> c2list = new ArrayList<String>(); > for (Tuple tup : c2s) { > String cur = (String)tup.get(0); > for (String ne : c2list) > inc(new numpair(ne, cur)); > c2list.add(cur); > } > } catch (Exception e) { > int errCode = 31415; > String msg = "Error while accumulating graphs (addPairsBag) " + > this.getClass().getSimpleName(); > throw new ExecException(msg, errCode, PigException.BUG, e); > } > } > > //This creates a databag in the form of (c2, c2, hits) > DataBag toBag() throws IOException { > try { > DataBag outBag = mBagFactory.newDefaultBag(); > for (Map.Entry<numpair,MutableInt> pairs : graph.entrySet()) { > List inList = new ArrayList(); > Iterator<String> sIt = pairs.getKey().getPartsIt(); > inList.add(sIt.next()); inList.add(sIt.next()); > inList.add(pairs.getValue()); > outBag.add(mTupleFactory.newTuple(inList)); > } > return outBag; > } catch (Exception e) { > int errCode = 31415; > String msg = "Error while accumulating graphs (toBag) " + > this.getClass().getSimpleName(); > throw new ExecException(msg, errCode, PigException.BUG, e); > } > } > } > private class numpair { > Set<String> pair; > > numpair(String p1, String p2) { > pair = new HashSet<String>(2,1); > pair.add(p1); > pair.add(p2); > } > Set<String> getPair() { return pair; } > Iterator<String> getPartsIt() { return pair.iterator(); } > @Override > public boolean equals(Object p) { > return p instanceof numpair && ((numpair)p).getPair().equals(pair); > } > @Override > public int hashCode() { > return pair.hashCode(); > } > > public String toString() { return pair.toString(); } > } > > private void buildgraph(Tuple input) throws IOException { > if (input == null || input.size() == 0) > return; > try { > if (thegraph == null) > thegraph = new Graph(new HashMap<numpair,MutableInt>()); > if (thegraph.getGraph() == null) > thegraph.setGraph(new HashMap<numpair, MutableInt>()); > DataBag bag = (DataBag)input.get(0); > for (Tuple ne : bag) > thegraph.addPairsBag((DataBag)ne.get(0)); > } catch (ExecException ee) { > throw ee; > } catch (Exception e) { > int errCode = 31415; > String msg = "Error while accumulating graphs in " + > this.getClass().getSimpleName(); > throw new ExecException(msg, errCode, PigException.BUG, e); > } > } > > > @Override > public Schema outputSchema(Schema input) { > try { > Schema bagSchema = new Schema(); > bagSchema.add(new Schema.FieldSchema("c2_1",DataType.CHARARRAY)); > bagSchema.add(new Schema.FieldSchema("c2_2",DataType.CHARARRAY)); > bagSchema.add(new Schema.FieldSchema("hits",DataType.INTEGER)); > return new Schema(new > Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), > input), bagSchema, DataType.BAG)); > } catch (Exception e) { > return null; > } > } > } >
