I am trying to parse logs to get some basic aggregate user information in this 
particular case, average, variance, and standard deviation start/end/and hours 
worked by day of week.  The logs are stored in HDFS partitioned into roughly 15 
minute chunks in directories by year/month/day/hour.  We have some utilities to 
parse fields from a log line into a map of key/value pairs (I know that PIG 
doesn't really use maps, but since the utility is already there I didn't see 
any reason to reinvent the wheel and we have some other code that also expects 
maps).  I implemented my own loader that reads each line, hands it off to the 
mapping utility, and then to reduce the size of records, can only hold onto the 
key/value pairs specified in the loaders constructor.

log = LOAD '$data' USING 
com.mrcy.intelligence.mitas.pig.load.SieveLoader('node', 'uid', 
'long_timestamp');
log_map = FILTER log BY $0 IS NOT NULL AND $0#'uid' IS NOT NULL;

--We have two LDAP servers and that is the only way to find the users so we 
have to figure out which one to use and then look them up
SPLIT log_map INTO cloud IF $0#'node' MATCHES '*.cloud*', dev OTHERWISE;
cloud = FOREACH cloud GENERATE $0#'uid' AS uid, $0#'long_timestamp' AS 
long_timestamp:long, 'mis01.dev.mrcy-intel.com' AS domain, '192.168.0.231' AS 
ldap_server;
dev = FOREACH dev GENERATE $0#'uid' AS uid, $0#'long_timestamp' AS 
long_timestamp:long, 'dev.mrcy-intel.com' AS domain, '10.1.11.231' AS 
ldap_server;
modified_logs = UNION dev, cloud;

--Calculate user times
user_times = FOREACH modified_logs GENERATE *, ToDate((long)long_timestamp) as 
date;
aliased_user_times = FOREACH user_times GENERATE *, GetYear(date) AS year:int, 
GetMonth(date) AS month:int, GetDay(date) AS day:int, 
com.mrcy.intelligence.mitas.pig.eval.GetDayOfWeek(date) AS day_of_week;

aliased_user_times = FOREACH aliased_user_times GENERATE *, 
MilliSecondsBetween( ToDate(long_timestamp), 
ToDate(CONCAT((chararray)CONCAT((chararray)year, 
(chararray)month),(chararray)day), 'yyyyMMdd')) AS miliseconds_into_day;

user_days = GROUP aliased_user_times BY (uid, ldap_server,domain, year, month, 
day, day_of_week);
times_by_day = FOREACH user_days GENERATE FLATTEN(group) AS (uid, ldap_server, 
domain, year, month, day, day_of_week), 
MAX(aliased_user_times.miliseconds_into_day) AS max, 
MIN(aliased_user_times.miliseconds_into_day) AS min, 
MAX(aliased_user_times.long_timestamp) - MIN(aliased_user_times.long_timestamp) 
AS time_on;

times_by_day_of_week = GROUP times_by_day BY (uid, ldap_server, domain, 
day_of_week);
averages = FOREACH times_by_day_of_week GENERATE FLATTEN(group) AS (uid, 
ldap_server, domain, day_of_week), 'USER' as type, AVG(times_by_day.min) AS 
start_avg, VAR(times_by_day.min) AS start_var, SQRT(VAR(times_by_day.min)) AS 
start_std, AVG(times_by_day.max) AS end_avg, VAR(times_by_day.max) AS end_var, 
SQRT(VAR(times_by_day.max)) AS end_std, AVG(times_by_day.time_on) AS hours_avg, 
VAR(times_by_day.time_on) AS hours_var, SQRT(VAR(times_by_day.time_on)) AS 
hours_std;

Right now we are not calling the script with any additional optimizations or 
flags.  The job completes when run against one day (each hour is roughly 5 gigs 
so a day might be around ~100-150 gigs), but fails horribly against more than 2 
days.  

What can I try to fix this?  

Reply via email to