You don't need JOIN at all.. Use this instead:
raw = LOAD 'data.txt' USING PigStorage AS (key:chararray, value:int,
date:chararray);
rawGroup = GROUP raw BY key;
data = FOREACH rawGroup {
epoch = (long) ISOToUnix(CustomFormatToISO(date, 'yyyy-MM-dd
HH:mm:ss'));
rawOrdered = ORDER raw BY epoch DESC;
rawLimited = LIMIT rawOrdered 1;
GENERATE
FLATTEN(rawLimited)
;
};
--STORE data;
And I recommend you to implement you own UDF to convert your date straight to
epoch (UNIX time).
Sincerely,
Marek M.
-----Original Message-----
From: Cheung, Po [mailto:[email protected]]
Sent: Friday, September 16, 2011 10:36 AM
To: [email protected]
Subject: Filtering records by key and date
I am trying to filter a set of records by key and last modified date so that
only one record is returned per key with the most recent date. I have a
working script below but wonder if there is a simpler and more elegant way to
do this.
Input:
KEY VALUE DATE
A 10 2011-01-01 23:59:00
A 11 2011-01-01 23:59:59
A 12 2011-01-01 23:00:59
B 20 2011-02-01 01:00:00
B 21 2011-02-02 01:00:00
C 30 2011-03-01 03:00:00
Output:
A 11 2011-01-01 23:59:59
B 21 2011-02-02 01:00:00
C 30 2011-03-01 03:00:00
REGISTER piggybank.jar;
REGISTER joda-time.jar;
DEFINE CustomFormatToISO
org.apache.pig.piggybank.evaluation.datetime.convert.CustomFormatToISO();
DEFINE ISOToUnix
org.apache.pig.piggybank.evaluation.datetime.convert.ISOToUnix();
raw = LOAD 'data.txt' USING PigStorage AS (key:chararray, value:int,
date:chararray);
-- Convert date string to Unix time as a new column data = FOREACH raw GENERATE
key, value, date, (long)ISOToUnix(CustomFormatToISO(date, 'yyyy-MM-dd
HH:mm:ss')) AS time:long;
grouped = GROUP data BY key;
-- Create a relation with key and max time only latest = FOREACH grouped
GENERATE flatten(group) AS key:chararray, (long) MAX(data.time) AS time:long;
-- Join 'data' with 'latest' on both key and time columns joined = JOIN data BY
(key, time), latest BY (key, time);
-- Project the original columns from the join result = FOREACH joined GENERATE
data::key, data::value, data::date;
DUMP result;
(A,11,2011-01-01 23:59:59)
(B,21,2011-02-02 01:00:00)
(C,30,2011-03-01 03:00:00)