Xuri,
I don't think you can use functions in the load statement like that. To do
something like that you'd need to write your own LoadFunc. As far as I can tell
at a glance, and I haven't used Pig 0.11 much, the new DateTime functions are
eval functions. That means they only operate on tuples during execution
(map-reduce or whatever emulates map-reduce in local mode) and _after_ the
input location has been resolved.
--jacob
@thedatachef
On Jul 30, 2013, at 1:30 AM, Xuri Nagarin wrote:
> Thanks Jacob.
>
> I threw in a little bash shell hack to make it simpler. Before I run the
> pig script, I run a bash script that stores timestamps in a day for every
> 20 minutes:
>
> *Shell*
>
> dt1=`date -u +%Y-%m-%dT00:00:00.000Z -d "10 days ago"` #10 days ago because
> I get data 10 days late :-)
>
>
> for ((i=0;i<1430; i=i+10)) ; do date -u "+%Y-%m-%dT%H:%M:%S.000Z" -d "$dt1
> +$i mins"; done
>
> Gives me:
> .
> .
> 2013-07-20T22:00:00.000Z
> 2013-07-20T22:10:00.000Z
> 2013-07-20T22:20:00.000Z
> 2013-07-20T22:30:00.000Z
> .
> .
>
> If the file above is generated by using the date as filename then I call it
> in my pig script as:
>
> %declare filepath `date -u +%Y-%m-%d -d "10 days ago"`;
> A1 = LOAD '$filepath.ts' USING PigStorage() AS (dt:datetime);
>
> Now, I can iterate over it:
>
> B = FOREACH A1 {
>
> C = FILTER A BY timestamp > 'dt' AND timestamp <
> 'AddDuration(ToDate(dt),PT20M)' ;
> .
> do something()
> }
>
> What I want to do is not use the bash command and instead use Pig's
> datetime functions. Unfortunately, I am stuck in syntactical hell.
>
> A = LOAD
> '/path/to/logs/ToDate(SubtractDuration(CurrentTime(),'P3D'),'yyyy-MM-dd')'
> USING PigStorage();
>
> yields:
> "2013-07-29 23:28:05,565 [main] ERROR org.apache.pig.tools.grunt.Grunt -
> ERROR 1200: <line 1, column 66> mismatched input 'P3D' expecting SEMI_COLON"
>
> I have tried various combinations of enclosing the date calculation
> functions in single quotes, brackets etc but can't seem to get anything to
> work :(
>
>
>
>
> On Mon, Jul 29, 2013 at 6:05 AM, Jacob Perkins
> <[email protected]>wrote:
>
>> Hi Xuri,
>>
>> This illustrates the use case for a UDF I've had to implement in one form
>> or another called 'FilterBag'. It's essentially just Pig's builtin "FILTER"
>> but would work like so (using your pseudocode):
>>
>>
>> A = load 'input' as (timestamp, worker, output);
>>
>> --
>> -- Assuming you want to restrict each calculation to a day. ToDay is most
>> likely going to be Piggybank's
>> -- ISOToDay truncation udf
>> --
>> with_day = foreach A generate timestamp, ToDay(timestamp) as day, worker,
>> output;
>>
>> --
>> -- First you'll have to get all output for a given worker on a given day
>> into single bag
>> --
>> worker_output = foreach (group with_day by (worker, day)) {
>> -- this relation (worker_output) will have one
>> tuple per unique worker, day, and timestamp
>> timestamps = distinct with_day.timestamp;
>> generate
>> flatten(group) as
>> (worker, day),
>> flatten(timestamps) as t1,
>> with_day.(timestamp, output) as outputs; -- A
>> bag that contains all of this workers output and their timestamps for this
>> day
>> };
>>
>> --
>> -- Next, filter each "outputs" bag to contain only outputs that occurred
>> within a 10 minute (or whatever time unit of interest) window from the
>> -- timestamp, looking forward (whether you look forward, back, or both is
>> up to you)
>> --
>> windowed = foreach worker_output {
>> -- FilterBag(bag, field_num, comparison_string,
>> to_compare)
>> -- bag: bag to filter
>> -- field_num: 0 indexed field num of the tuples in the
>> bag to use for comparison to "to_compare"
>> -- comparison_string: one of 'lt', 'lte', 'e', 'gte',
>> 'gt' corresponding to less than, less than or equal to and so on
>> -- to_compare: the object to compare to
>>
>> outputs_after = FilterBag(outputs, 0, 'gte',
>> t1);
>> outputs_windowed = FilterBag(outputs_after, 0, 'lt',
>> t1+$TIME_UNIT);
>>
>> -- what we WANT to do is this:
>> --
>> -- outputs_windowed = filter outputs by timestamp >=
>> t1 and timestamp < t1+$TIME_UNIT;
>> --
>> -- but, I have never been able to make pig happy with
>> this, thus FilterBag.
>>
>> generate
>> worker, day, t1, SUM(outputs_windowed.output) as
>> summed_output, COUNT(outputs_windowed) as count;
>> };
>>
>> dump windowed;
>>
>>
>>
>> Notice that you'll have one record for each worker and timestamp that was
>> actually measured. You'll have to do something more fancy if you want
>> smoothing (eg. a record for timestamps where no data was recorded).
>>
>> Importantly, it would be fantastic to be able to do this without a udf and
>> just using Pig's filter command as shown in the comments above. However,
>> I've tried this in several different ways and never gotten Pig to be happy
>> with it. Instead, I've written a udf called "FilterBag" to accomplish this.
>> Perhaps another Pig user can illuminate the situation better?
>>
>> I'll see about publishing a simple version of FilterBag if it seems the
>> pig community would use it.
>>
>> --jacob
>> @thedatachef
>>
>> On Jul 28, 2013, at 8:34 PM, Xuri Nagarin wrote:
>>
>>> Hi,
>>>
>>> Lets say I have a data set of units of output per worker per second
>> that's
>>> in chronological order for a whole day
>>>
>>> Example:
>>> 2013-07-26T14:00:00, Joe, 50
>>> 2013-07-26T14:10:00, Jane,60
>>> 2013-07-26T14:15:00, Joe, 55
>>> 2013-07-26T14:20:00, Jane,60
>>>
>>> I create the data set above by loading a larger data set and getting
>> these
>>> three attributes in a relation.
>>>
>>> Now, I want to count output per user per unit of time period, say every
>> ten
>>> minutes but as a rolling count with a window that moves by the minute.
>> The
>>> pseudo-code would be something along the lines of:
>>>
>>> -----------xxxxxxxxxxxxxxxxx-------------------
>>> A = LOAD 'input' AS (timestamp, worker, output);
>>>
>>> ts1=0
>>> ts2=1440 (24 hours x 60 mins/hr)
>>>
>>> for (i=ts1, i<=(ts2-10), i++)
>>> {
>>> R1 = FILTER A BY timestamp > $i AND timestamp < ($i + 10);
>>> GRP = R1 BY (worker, output);
>>> CNT = FOREACH GRP GENERATE group, COUNT(GRP);
>>> DUMP CNT;
>>> }
>>> -----------xxxxxxxxxxxxxxxxx-------------------
>>>
>>> But I can't figure out how to do this simple iteration in pig using
>>> FOREACH. I think the answer is create a relation that has a data set that
>>> has all the minutes in a day {0.....1440} and then iterate over it?
>>>
>>> Sorry if my Pig terminology isn't correct. I have been using it only for
>> a
>>> day now.
>>>
>>> Any pointers will be highly appreciated.
>>>
>>> TIA.
>>
>>