Hi Xuri,

This illustrates the use case for a UDF I've had to implement in one form or 
another called 'FilterBag'. It's essentially just Pig's builtin "FILTER" but 
would work like so (using your pseudocode):


A = load 'input' as (timestamp, worker, output);

--
-- Assuming you want to restrict each calculation to a day. ToDay is most 
likely going to be Piggybank's
-- ISOToDay truncation udf
--
with_day = foreach A generate timestamp, ToDay(timestamp) as day, worker, 
output;

--
-- First you'll have to get all output for a given worker on a given day into 
single bag
--
worker_output = foreach (group with_day by (worker, day)) {
                          -- this relation (worker_output) will have one tuple 
per unique worker, day, and timestamp
                          timestamps = distinct with_day.timestamp;
                          generate
                            flatten(group)                        as (worker, 
day),
                            flatten(timestamps)               as t1,
                            with_day.(timestamp, output) as outputs; -- A bag 
that contains all of this workers output and their timestamps for this day
                         };

--
-- Next, filter each "outputs" bag to contain only outputs that occurred within 
a 10 minute (or whatever time unit of interest) window from the
-- timestamp, looking forward (whether you look forward, back, or both is up to 
you)
--
windowed = foreach worker_output {
                    -- FilterBag(bag, field_num, comparison_string, to_compare)
                    -- bag: bag to filter
                    -- field_num: 0 indexed field num of the tuples in the bag 
to use for comparison to "to_compare"
                    -- comparison_string: one of 'lt', 'lte', 'e', 'gte', 'gt' 
corresponding to less than, less than or equal to and so on
                    -- to_compare: the object to compare to

                    outputs_after         = FilterBag(outputs, 0, 'gte', t1);
                    outputs_windowed = FilterBag(outputs_after, 0, 'lt', 
t1+$TIME_UNIT);
                    
                    -- what we WANT to do is this:
                    --
                    -- outputs_windowed = filter outputs by timestamp >= t1 and 
timestamp < t1+$TIME_UNIT;
                    --
                    -- but, I have never been able to make pig happy with this, 
thus FilterBag.
                    
                    generate
                     worker, day, t1, SUM(outputs_windowed.output) as 
summed_output, COUNT(outputs_windowed) as count;
                  };

dump windowed;



Notice that you'll have one record for each worker and timestamp that was 
actually measured. You'll have to do something more fancy if you want smoothing 
(eg. a record for timestamps where no data was recorded).

Importantly, it would be fantastic to be able to do this without a udf and just 
using Pig's filter command as shown in the comments above. However, I've tried 
this in several different ways and never gotten Pig to be happy with it. 
Instead, I've written a udf called "FilterBag" to accomplish this. Perhaps 
another Pig user can illuminate the situation better?

I'll see about publishing a simple version of FilterBag if it seems the pig 
community would use it.

--jacob
@thedatachef
                          
On Jul 28, 2013, at 8:34 PM, Xuri Nagarin wrote:

> Hi,
> 
> Lets say I have a data set of units of output per worker per second that's
> in chronological order for a whole day
> 
> Example:
> 2013-07-26T14:00:00, Joe, 50
> 2013-07-26T14:10:00, Jane,60
> 2013-07-26T14:15:00, Joe, 55
> 2013-07-26T14:20:00, Jane,60
> 
> I create the data set above by loading a larger data set and getting these
> three attributes in a relation.
> 
> Now, I want to count output per user per unit of time period, say every ten
> minutes but as a rolling count with a window that moves by the minute. The
> pseudo-code would be something along the lines of:
> 
> -----------xxxxxxxxxxxxxxxxx-------------------
> A = LOAD 'input' AS (timestamp, worker, output);
> 
> ts1=0
> ts2=1440 (24 hours x 60 mins/hr)
> 
> for (i=ts1, i<=(ts2-10), i++)
>   {
>     R1 = FILTER A BY timestamp > $i AND timestamp < ($i + 10);
>     GRP = R1 BY (worker, output);
>     CNT = FOREACH GRP GENERATE group, COUNT(GRP);
>     DUMP CNT;
>    }
> -----------xxxxxxxxxxxxxxxxx-------------------
> 
> But I can't figure out how to do this simple iteration in pig using
> FOREACH. I think the answer is create a relation that has a data set that
> has all the minutes in a day {0.....1440} and then iterate over it?
> 
> Sorry if my Pig terminology isn't correct. I have been using it only for a
> day now.
> 
> Any pointers will be highly appreciated.
> 
> TIA.

Reply via email to