Hi Suraj, I can't run this test today, but I've managed to at least take a look at my code.
You are right, my wrong impression came from the fact that I'm keeping some class state across bags and because of that I got confused about what was really happening. I thank you for your help, and I'll be more careful next time. Thanks a lot. Rodrigo. 2014-07-13 16:30 GMT+02:00 Suraj Nayak M <snay...@gmail.com>: > Rodrigo, > > I tried following to reproduce the problem. In my UDF *GroupTest*, I was > getting data related to 1 key at a time(see *LOG *below). Added UDF Code, > Pig Code, Input data and Log output below. Let me know in case I have > missed anything. > > *UDF Code :* (Intentionally added sysout in UDF to check bag size in the > log) > package com.pigtutorial.ch01; > > import java.io.IOException; > import java.util.Iterator; > > import org.apache.pig.EvalFunc; > import org.apache.pig.data.BagFactory; > import org.apache.pig.data.DataBag; > import org.apache.pig.data.DataType; > import org.apache.pig.data.Tuple; > > public class GroupTest extends EvalFunc<DataBag> { > > @Override > public DataBag exec(Tuple input) throws IOException { > > DataBag returnBag = BagFactory.getInstance().newDefaultBag(); > if (input == null || input.size() == 0 || input.get(0) == null) > return null; > try { > DataBag bag = DataType.toBag(input.get(0)); > * System.out.println("Calling UDF with databag size : " + > bag.size());* > > Iterator it = bag.iterator(); > while (it.hasNext()) { > Tuple t = (Tuple) it.next(); > * System.out.println(t);* > returnBag.add(t); > } > } catch (Exception e) { > throw new IOException("Caught exception processing input row > ", e); > } > return returnBag; > } > > } > > *Pig Code :* > a = > LOAD 'src/resources/data/input/data_input' > USING PigStorage(',') > AS (KEY:chararray, VAL1:chararray, VAL2:chararray); > > b = GROUP a > BY KEY > PARALLEL 2; > > c = FILTER b > BY NOT IsEmpty(a); > > d = FOREACH c > GENERATE FLATTEN(com.pigtutorial.ch01.GroupTest(a)); > STORE > a > INTO 'src/resources/data/actual_output/group_output' > USING PigStorage(','); > > *INPUT :* > key1,d1,d2 > key2,d1,d3 > key1,d1,d4 > key1,d1,d5 > > *LOG :* > Calling UDF with databag size : 3 > (key1,d1,d2) > (key1,d1,d4) > (key1,d1,d5) > Calling UDF with databag size : 1 > (key2,d1,d3) > > -- > Thanks > Suraj Nayak > > > On Sunday 13 July 2014 04:36 AM, Rodrigo Ferreira wrote: > > (answering again but now including the mailing list :P) > > Thank you for your answer Suraj. > > What you said is exactly what I expect, but I get something different. > > Using your example (the specific data is not important here) I get in my > UDF more than one key ordered. Here's a sample of the code of my UDF: > > > DataBag bag = DataType.toBag(input.get(0)); > > Iterator it = bag.iterator(); > > while (it.hasNext()) { > Tuple t = (Tuple)it.next(); > //Here I print the attribute used as the grouping key > } > > > What I get in the output is: > > key1 > key1 > key1 > key2 > > The point is that I'm using test data that are not really big (less than > 64MB). Anyhow, Pig shouldn't put these keys together in the same bag! Maybe > this a kind of optimization that I should turn off. > > > 2014-07-12 23:29 GMT+02:00 Suraj Nayak <snay...@gmail.com>: > >> Are you processing the bag in the UDF? >> >> Can you send sample records which is going in to UDF using dump command >> for alias C? >> >> If the data is(alias A) >> (key1,d1,d2) >> (key2,d1,d3) >> (key1,d1,d4) >> (key1,d1,d5) >> >> On grouping on 1st column the data should be grouped as below >> >> {(key1),{ (key1,d1,d2), (key1,d1,d4), (key1,d1,d5) }} >> {(key2),{ (key2,d1,d3) }} >> >> If you are providing the data A to UDF you should get all records with >> respect to same key in same bag. >> >> -- >> Suraj Nayak >> > > >