Hi Suraj, I can't run this test today, but I've managed to at least take a
look at my code.

You are right, my wrong impression came from the fact that I'm keeping some
class state across bags and because of that I got confused about what was
really happening. I thank you for your help, and I'll be more careful next
time.

Thanks a lot.

Rodrigo.


2014-07-13 16:30 GMT+02:00 Suraj Nayak M <snay...@gmail.com>:

>  Rodrigo,
>
> I tried following to reproduce the problem. In my UDF *GroupTest*, I was
> getting data related to 1 key at a time(see *LOG *below). Added UDF Code,
> Pig Code, Input data and Log output below. Let me know in case I have
> missed anything.
>
> *UDF Code :* (Intentionally added sysout in UDF to check bag size in the
> log)
> package com.pigtutorial.ch01;
>
> import java.io.IOException;
> import java.util.Iterator;
>
> import org.apache.pig.EvalFunc;
> import org.apache.pig.data.BagFactory;
> import org.apache.pig.data.DataBag;
> import org.apache.pig.data.DataType;
> import org.apache.pig.data.Tuple;
>
> public class GroupTest extends EvalFunc<DataBag> {
>
>     @Override
>     public DataBag exec(Tuple input) throws IOException {
>
>         DataBag returnBag = BagFactory.getInstance().newDefaultBag();
>         if (input == null || input.size() == 0 || input.get(0) == null)
>             return null;
>         try {
>             DataBag bag = DataType.toBag(input.get(0));
>           *  System.out.println("Calling UDF with databag size : " +
> bag.size());*
>
>             Iterator it = bag.iterator();
>             while (it.hasNext()) {
>                 Tuple t = (Tuple) it.next();
>                * System.out.println(t);*
>                 returnBag.add(t);
>             }
>         } catch (Exception e) {
>             throw new IOException("Caught exception processing input row
> ", e);
>         }
>         return returnBag;
>     }
>
> }
>
> *Pig Code :*
> a =
>     LOAD 'src/resources/data/input/data_input'
>     USING PigStorage(',')
>     AS (KEY:chararray, VAL1:chararray, VAL2:chararray);
>
> b = GROUP a
>     BY KEY
>     PARALLEL 2;
>
> c = FILTER b
>     BY NOT IsEmpty(a);
>
> d = FOREACH c
>     GENERATE FLATTEN(com.pigtutorial.ch01.GroupTest(a));
> STORE
>     a
>     INTO 'src/resources/data/actual_output/group_output'
>     USING PigStorage(',');
>
> *INPUT :*
> key1,d1,d2
> key2,d1,d3
> key1,d1,d4
> key1,d1,d5
>
> *LOG :*
> Calling UDF with databag size : 3
> (key1,d1,d2)
> (key1,d1,d4)
> (key1,d1,d5)
> Calling UDF with databag size : 1
> (key2,d1,d3)
>
> --
> Thanks
> Suraj Nayak
>
>
> On Sunday 13 July 2014 04:36 AM, Rodrigo Ferreira wrote:
>
> (answering again but now including the mailing list :P)
>
> Thank you for your answer Suraj.
>
>  What you said is exactly what I expect, but I get something different.
>
>  Using your example (the specific data is not important here) I get in my
> UDF more than one key ordered. Here's a sample of the code of my UDF:
>
>
> DataBag bag = DataType.toBag(input.get(0));
>
>  Iterator it = bag.iterator();
>
>  while (it.hasNext()) {
>  Tuple t = (Tuple)it.next();
> //Here I print the attribute used as the grouping key
>  }
>
>
>  What I get in the output is:
>
>  key1
>  key1
>  key1
> key2
>
>  The point is that I'm using test data that are not really big (less than
> 64MB). Anyhow, Pig shouldn't put these keys together in the same bag! Maybe
> this a kind of optimization that I should turn off.
>
>
> 2014-07-12 23:29 GMT+02:00 Suraj Nayak <snay...@gmail.com>:
>
>> Are you processing the bag in the UDF?
>>
>> Can you send sample records which is going in to UDF using dump command
>> for alias C?
>>
>> If the data is(alias A)
>> (key1,d1,d2)
>> (key2,d1,d3)
>> (key1,d1,d4)
>> (key1,d1,d5)
>>
>> On grouping on 1st column the data should be grouped as below
>>
>> {(key1),{ (key1,d1,d2), (key1,d1,d4), (key1,d1,d5) }}
>> {(key2),{ (key2,d1,d3) }}
>>
>> If you are providing the data A to UDF you should get all records with
>> respect to same key in same bag.
>>
>> --
>> Suraj Nayak
>>
>
>
>

Reply via email to