Charles, I did open an issue here: https://issues.apache.org/jira/browse/PIG-1826 <https://issues.apache.org/jira/browse/PIG-1826>It'd be great if you could upload your script and UDF! Hopefully having more vectors will help make it easier to figure out what is going on.
2011/2/10 Charles Gonçalves <[email protected]> > Jonathan, > > did you opened the issue? > I'm suffering by the same problem ... > > Any follow up? > > On Wed, Jan 26, 2011 at 4:17 PM, Jonathan Coveney <[email protected] > >wrote: > > > I have never really had to raise a bug before, what should I do? Open a > > ticket, attach the code and the description, and posit that it may be a > > serialiazable error? > > > > It would make me feel so good if there was a bug, though...I've tried to > > implement some more elegant algebraic versions of things and gotten the > > same > > error. > > > > thanks for taking a look at this > > > > 2011/1/26 Gianmarco <[email protected]> > > > > > Thanks Jonathan, > > > your code looks fine. > > > I think there is a bug in the serialization code for bags. > > > > > > From a quick look, the size of the bag is a long but it is read as an > > > int when comparing bags. > > > Not sure this is the culprit though. > > > > > > I think you should open a Jira for this. > > > -- > > > Gianmarco De Francisci Morales > > > > > > > > > > > > On Tue, Jan 25, 2011 at 21:47, Jonathan Coveney <[email protected]> > > > wrote: > > > > package squeal.fun; > > > > import java.util.Iterator; > > > > import java.util.List; > > > > import java.util.ArrayList; > > > > import java.util.Map; > > > > import java.util.HashMap; > > > > import java.util.Set; > > > > import java.util.HashSet; > > > > import java.io.IOException; > > > > import org.apache.pig.PigException; > > > > import org.apache.pig.backend.executionengine.ExecException; > > > > import org.apache.pig.EvalFunc; > > > > import org.apache.pig.data.Tuple; > > > > import org.apache.pig.data.DataBag; > > > > import org.apache.pig.data.BagFactory; > > > > import org.apache.pig.data.TupleFactory; > > > > import org.apache.pig.impl.util.WrappedIOException; > > > > import org.apache.pig.impl.logicalLayer.schema.Schema; > > > > import org.apache.pig.data.DataType; > > > > import squeal.com.MutableInt; > > > > > > > > public class numgraph extends EvalFunc<DataBag>{ > > > > TupleFactory mTupleFactory = TupleFactory.getInstance(); > > > > BagFactory mBagFactory = BagFactory.getInstance(); > > > > > > > > public DataBag exec(Tuple input) throws IOException { > > > > try { > > > > accumulate(input); > > > > DataBag bag = getValue(); > > > > System.out.println(input.get(0).toString()); > > > > System.out.println(bag.toString()); > > > > return bag; > > > > > > > > } catch (Exception e) { > > > > int errCode = 31415; > > > > String msg = "Error while accumulating graphs (exec) " + > > > > this.getClass().getSimpleName(); > > > > throw new ExecException(msg, errCode, PigException.BUG, e); > > > > } > > > > } > > > > > > > > public void accumulate(Tuple input) throws IOException { > > > > try { > > > > buildgraph(input); > > > > } catch (Exception e) { > > > > int errCode = 31415; > > > > String msg = "Error while accumulating graphs (accumulate) " + > > > > this.getClass().getSimpleName(); > > > > throw new ExecException(msg, errCode, PigException.BUG, e); > > > > } > > > > } > > > > > > > > //public void cleanup() { thegraph.clear(); } > > > > > > > > public DataBag getValue() throws IOException { > > > > try { > > > > return thegraph.toBag(); > > > > } catch (Exception e) { > > > > int errCode = 31415; > > > > String msg = "Error while accumulating graphs (getValue) " + > > > > this.getClass().getSimpleName(); > > > > throw new ExecException(msg, errCode, PigException.BUG, e); > > > > } > > > > } > > > > > > > > Graph thegraph = null; > > > > private class Graph { > > > > Map<numpair, MutableInt> graph; > > > > > > > > Graph() { graph = null; } > > > > Graph(Map<numpair,MutableInt> gs) { graph = gs; } > > > > > > > > Map<numpair,MutableInt> getGraph() { return graph; } > > > > void setGraph(Map<numpair,MutableInt> gs) { graph = gs; } > > > > void inc(numpair look) { > > > > MutableInt val = graph.get(look); > > > > if (val == null) { > > > > val = new MutableInt(); > > > > graph.put(look,val); > > > > } else { > > > > val.inc(); > > > > } > > > > } > > > > > > > > void clear() { graph = null; } > > > > > > > > @Override > > > > public String toString() { return graph.toString(); } > > > > > > > > void addPairsBag(DataBag c2s) throws IOException { > > > > try { > > > > List<String> c2list = new ArrayList<String>(); > > > > for (Tuple tup : c2s) { > > > > String cur = (String)tup.get(0); > > > > for (String ne : c2list) > > > > inc(new numpair(ne, cur)); > > > > c2list.add(cur); > > > > } > > > > } catch (Exception e) { > > > > int errCode = 31415; > > > > String msg = "Error while accumulating graphs (addPairsBag) " + > > > > this.getClass().getSimpleName(); > > > > throw new ExecException(msg, errCode, PigException.BUG, e); > > > > } > > > > } > > > > > > > > //This creates a databag in the form of (c2, c2, hits) > > > > DataBag toBag() throws IOException { > > > > try { > > > > DataBag outBag = mBagFactory.newDefaultBag(); > > > > for (Map.Entry<numpair,MutableInt> pairs : graph.entrySet()) { > > > > List inList = new ArrayList(); > > > > Iterator<String> sIt = pairs.getKey().getPartsIt(); > > > > inList.add(sIt.next()); inList.add(sIt.next()); > > > > inList.add(pairs.getValue()); > > > > outBag.add(mTupleFactory.newTuple(inList)); > > > > } > > > > return outBag; > > > > } catch (Exception e) { > > > > int errCode = 31415; > > > > String msg = "Error while accumulating graphs (toBag) " + > > > > this.getClass().getSimpleName(); > > > > throw new ExecException(msg, errCode, PigException.BUG, e); > > > > } > > > > } > > > > } > > > > private class numpair { > > > > Set<String> pair; > > > > > > > > numpair(String p1, String p2) { > > > > pair = new HashSet<String>(2,1); > > > > pair.add(p1); > > > > pair.add(p2); > > > > } > > > > Set<String> getPair() { return pair; } > > > > Iterator<String> getPartsIt() { return pair.iterator(); } > > > > @Override > > > > public boolean equals(Object p) { > > > > return p instanceof numpair && ((numpair)p).getPair().equals(pair); > > > > } > > > > @Override > > > > public int hashCode() { > > > > return pair.hashCode(); > > > > } > > > > > > > > public String toString() { return pair.toString(); } > > > > } > > > > > > > > private void buildgraph(Tuple input) throws IOException { > > > > if (input == null || input.size() == 0) > > > > return; > > > > try { > > > > if (thegraph == null) > > > > thegraph = new Graph(new HashMap<numpair,MutableInt>()); > > > > if (thegraph.getGraph() == null) > > > > thegraph.setGraph(new HashMap<numpair, MutableInt>()); > > > > DataBag bag = (DataBag)input.get(0); > > > > for (Tuple ne : bag) > > > > thegraph.addPairsBag((DataBag)ne.get(0)); > > > > } catch (ExecException ee) { > > > > throw ee; > > > > } catch (Exception e) { > > > > int errCode = 31415; > > > > String msg = "Error while accumulating graphs in " + > > > > this.getClass().getSimpleName(); > > > > throw new ExecException(msg, errCode, PigException.BUG, e); > > > > } > > > > } > > > > > > > > > > > > @Override > > > > public Schema outputSchema(Schema input) { > > > > try { > > > > Schema bagSchema = new Schema(); > > > > bagSchema.add(new Schema.FieldSchema("c2_1",DataType.CHARARRAY)); > > > > bagSchema.add(new Schema.FieldSchema("c2_2",DataType.CHARARRAY)); > > > > bagSchema.add(new Schema.FieldSchema("hits",DataType.INTEGER)); > > > > return new Schema(new > > > > > > Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), > > > > input), bagSchema, DataType.BAG)); > > > > } catch (Exception e) { > > > > return null; > > > > } > > > > } > > > > } > > > > > > > > > > > > > -- > *Charles Ferreira Gonçalves * > http://homepages.dcc.ufmg.br/~charles/ > UFMG - ICEx - Dcc > Cel.: 55 31 87741485 > Tel.: 55 31 34741485 > Lab.: 55 31 34095840 >
