On Jan 26, 2011, at 10:17 AM, Jonathan Coveney wrote:

I have never really had to raise a bug before, what should I do? Open a ticket, attach the code and the description, and posit that it may be a
serialiazable error?

Yes.


It would make me feel so good if there was a bug, though...I've tried to implement some more elegant algebraic versions of things and gotten the same
error.



Alan.


thanks for taking a look at this

2011/1/26 Gianmarco <[email protected]>

Thanks Jonathan,
your code looks fine.
I think there is a bug in the serialization code for bags.

From a quick look, the size of the bag is a long but it is read as an
int when comparing bags.
Not sure this is the culprit though.

I think you should open a Jira for this.
--
Gianmarco De Francisci Morales



On Tue, Jan 25, 2011 at 21:47, Jonathan Coveney <[email protected]>
wrote:
package squeal.fun;
import java.util.Iterator;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import java.util.HashSet;
import java.io.IOException;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.util.WrappedIOException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.data.DataType;
import squeal.com.MutableInt;

public class numgraph extends EvalFunc<DataBag>{
TupleFactory mTupleFactory = TupleFactory.getInstance();
BagFactory mBagFactory = BagFactory.getInstance();

public DataBag exec(Tuple input) throws IOException {
try {
accumulate(input);
DataBag bag = getValue();
System.out.println(input.get(0).toString());
System.out.println(bag.toString());
return bag;

} catch (Exception e) {
int errCode = 31415;
String msg = "Error while accumulating graphs (exec) " +
this.getClass().getSimpleName();
throw new ExecException(msg, errCode, PigException.BUG, e);
}
}

public void accumulate(Tuple input) throws IOException {
try {
buildgraph(input);
} catch (Exception e) {
int errCode = 31415;
String msg = "Error while accumulating graphs (accumulate) " +
this.getClass().getSimpleName();
throw new ExecException(msg, errCode, PigException.BUG, e);
}
}

//public void cleanup() { thegraph.clear(); }

public DataBag getValue() throws IOException {
try {
return thegraph.toBag();
} catch (Exception e) {
int errCode = 31415;
String msg = "Error while accumulating graphs (getValue) " +
this.getClass().getSimpleName();
throw new ExecException(msg, errCode, PigException.BUG, e);
}
}

Graph thegraph = null;
private class Graph {
Map<numpair, MutableInt> graph;

Graph() { graph = null; }
Graph(Map<numpair,MutableInt> gs) { graph = gs; }

Map<numpair,MutableInt> getGraph() { return graph; }
void setGraph(Map<numpair,MutableInt> gs) { graph = gs; }
void inc(numpair look) {
MutableInt val = graph.get(look);
if (val == null) {
val = new MutableInt();
graph.put(look,val);
} else {
val.inc();
}
}

void clear() { graph = null; }

@Override
public String toString() { return graph.toString(); }

void addPairsBag(DataBag c2s) throws IOException {
try {
List<String> c2list = new ArrayList<String>();
for (Tuple tup : c2s) {
String cur = (String)tup.get(0);
for (String ne : c2list)
inc(new numpair(ne, cur));
c2list.add(cur);
}
} catch (Exception e) {
int errCode = 31415;
String msg = "Error while accumulating graphs (addPairsBag) " +
this.getClass().getSimpleName();
throw new ExecException(msg, errCode, PigException.BUG, e);
}
}

//This creates a databag in the form of (c2, c2, hits)
DataBag toBag() throws IOException {
try {
DataBag outBag = mBagFactory.newDefaultBag();
for (Map.Entry<numpair,MutableInt> pairs : graph.entrySet()) {
List inList = new ArrayList();
Iterator<String> sIt = pairs.getKey().getPartsIt();
inList.add(sIt.next()); inList.add(sIt.next());
inList.add(pairs.getValue());
outBag.add(mTupleFactory.newTuple(inList));
}
return outBag;
} catch (Exception e) {
int errCode = 31415;
String msg = "Error while accumulating graphs (toBag) " +
this.getClass().getSimpleName();
throw new ExecException(msg, errCode, PigException.BUG, e);
}
}
}
private class numpair {
Set<String> pair;

numpair(String p1, String p2) {
pair = new HashSet<String>(2,1);
pair.add(p1);
pair.add(p2);
}
Set<String> getPair() { return pair; }
Iterator<String> getPartsIt() { return pair.iterator(); }
@Override
public boolean equals(Object p) {
return p instanceof numpair && ((numpair)p).getPair().equals(pair);
}
@Override
public int hashCode() {
return pair.hashCode();
}

public String toString() { return pair.toString(); }
}

private void buildgraph(Tuple input) throws IOException {
if (input == null || input.size() == 0)
return;
try {
if (thegraph == null)
thegraph = new Graph(new HashMap<numpair,MutableInt>());
if (thegraph.getGraph() == null)
thegraph.setGraph(new HashMap<numpair, MutableInt>());
DataBag bag = (DataBag)input.get(0);
for (Tuple ne : bag)
thegraph.addPairsBag((DataBag)ne.get(0));
} catch (ExecException ee) {
throw ee;
} catch (Exception e) {
int errCode = 31415;
String msg = "Error while accumulating graphs in " +
this.getClass().getSimpleName();
throw new ExecException(msg, errCode, PigException.BUG, e);
}
}


@Override
public Schema outputSchema(Schema input) {
try {
Schema bagSchema = new Schema();
bagSchema.add(new Schema.FieldSchema("c2_1",DataType.CHARARRAY));
bagSchema.add(new Schema.FieldSchema("c2_2",DataType.CHARARRAY));
bagSchema.add(new Schema.FieldSchema("hits",DataType.INTEGER));
return new Schema(new
Schema .FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(),
input), bagSchema, DataType.BAG));
} catch (Exception e) {
return null;
}
}
}



Reply via email to