Hi Xuri,
This illustrates the use case for a UDF I've had to implement in one form or
another called 'FilterBag'. It's essentially just Pig's builtin "FILTER" but
would work like so (using your pseudocode):
A = load 'input' as (timestamp, worker, output);
--
-- Assuming you want to restrict each calculation to a day. ToDay is most
likely going to be Piggybank's
-- ISOToDay truncation udf
--
with_day = foreach A generate timestamp, ToDay(timestamp) as day, worker,
output;
--
-- First you'll have to get all output for a given worker on a given day into
single bag
--
worker_output = foreach (group with_day by (worker, day)) {
-- this relation (worker_output) will have one tuple
per unique worker, day, and timestamp
timestamps = distinct with_day.timestamp;
generate
flatten(group) as (worker,
day),
flatten(timestamps) as t1,
with_day.(timestamp, output) as outputs; -- A bag
that contains all of this workers output and their timestamps for this day
};
--
-- Next, filter each "outputs" bag to contain only outputs that occurred within
a 10 minute (or whatever time unit of interest) window from the
-- timestamp, looking forward (whether you look forward, back, or both is up to
you)
--
windowed = foreach worker_output {
-- FilterBag(bag, field_num, comparison_string, to_compare)
-- bag: bag to filter
-- field_num: 0 indexed field num of the tuples in the bag
to use for comparison to "to_compare"
-- comparison_string: one of 'lt', 'lte', 'e', 'gte', 'gt'
corresponding to less than, less than or equal to and so on
-- to_compare: the object to compare to
outputs_after = FilterBag(outputs, 0, 'gte', t1);
outputs_windowed = FilterBag(outputs_after, 0, 'lt',
t1+$TIME_UNIT);
-- what we WANT to do is this:
--
-- outputs_windowed = filter outputs by timestamp >= t1 and
timestamp < t1+$TIME_UNIT;
--
-- but, I have never been able to make pig happy with this,
thus FilterBag.
generate
worker, day, t1, SUM(outputs_windowed.output) as
summed_output, COUNT(outputs_windowed) as count;
};
dump windowed;
Notice that you'll have one record for each worker and timestamp that was
actually measured. You'll have to do something more fancy if you want smoothing
(eg. a record for timestamps where no data was recorded).
Importantly, it would be fantastic to be able to do this without a udf and just
using Pig's filter command as shown in the comments above. However, I've tried
this in several different ways and never gotten Pig to be happy with it.
Instead, I've written a udf called "FilterBag" to accomplish this. Perhaps
another Pig user can illuminate the situation better?
I'll see about publishing a simple version of FilterBag if it seems the pig
community would use it.
--jacob
@thedatachef
On Jul 28, 2013, at 8:34 PM, Xuri Nagarin wrote:
> Hi,
>
> Lets say I have a data set of units of output per worker per second that's
> in chronological order for a whole day
>
> Example:
> 2013-07-26T14:00:00, Joe, 50
> 2013-07-26T14:10:00, Jane,60
> 2013-07-26T14:15:00, Joe, 55
> 2013-07-26T14:20:00, Jane,60
>
> I create the data set above by loading a larger data set and getting these
> three attributes in a relation.
>
> Now, I want to count output per user per unit of time period, say every ten
> minutes but as a rolling count with a window that moves by the minute. The
> pseudo-code would be something along the lines of:
>
> -----------xxxxxxxxxxxxxxxxx-------------------
> A = LOAD 'input' AS (timestamp, worker, output);
>
> ts1=0
> ts2=1440 (24 hours x 60 mins/hr)
>
> for (i=ts1, i<=(ts2-10), i++)
> {
> R1 = FILTER A BY timestamp > $i AND timestamp < ($i + 10);
> GRP = R1 BY (worker, output);
> CNT = FOREACH GRP GENERATE group, COUNT(GRP);
> DUMP CNT;
> }
> -----------xxxxxxxxxxxxxxxxx-------------------
>
> But I can't figure out how to do this simple iteration in pig using
> FOREACH. I think the answer is create a relation that has a data set that
> has all the minutes in a day {0.....1440} and then iterate over it?
>
> Sorry if my Pig terminology isn't correct. I have been using it only for a
> day now.
>
> Any pointers will be highly appreciated.
>
> TIA.