I am trying to parse logs to get some basic aggregate user information in this
particular case, average, variance, and standard deviation start/end/and hours
worked by day of week. The logs are stored in HDFS partitioned into roughly 15
minute chunks in directories by year/month/day/hour. We have some utilities to
parse fields from a log line into a map of key/value pairs (I know that PIG
doesn't really use maps, but since the utility is already there I didn't see
any reason to reinvent the wheel and we have some other code that also expects
maps). I implemented my own loader that reads each line, hands it off to the
mapping utility, and then to reduce the size of records, can only hold onto the
key/value pairs specified in the loaders constructor.
log = LOAD '$data' USING
com.mrcy.intelligence.mitas.pig.load.SieveLoader('node', 'uid',
'long_timestamp');
log_map = FILTER log BY $0 IS NOT NULL AND $0#'uid' IS NOT NULL;
--We have two LDAP servers and that is the only way to find the users so we
have to figure out which one to use and then look them up
SPLIT log_map INTO cloud IF $0#'node' MATCHES '*.cloud*', dev OTHERWISE;
cloud = FOREACH cloud GENERATE $0#'uid' AS uid, $0#'long_timestamp' AS
long_timestamp:long, 'mis01.dev.mrcy-intel.com' AS domain, '192.168.0.231' AS
ldap_server;
dev = FOREACH dev GENERATE $0#'uid' AS uid, $0#'long_timestamp' AS
long_timestamp:long, 'dev.mrcy-intel.com' AS domain, '10.1.11.231' AS
ldap_server;
modified_logs = UNION dev, cloud;
--Calculate user times
user_times = FOREACH modified_logs GENERATE *, ToDate((long)long_timestamp) as
date;
aliased_user_times = FOREACH user_times GENERATE *, GetYear(date) AS year:int,
GetMonth(date) AS month:int, GetDay(date) AS day:int,
com.mrcy.intelligence.mitas.pig.eval.GetDayOfWeek(date) AS day_of_week;
aliased_user_times = FOREACH aliased_user_times GENERATE *,
MilliSecondsBetween( ToDate(long_timestamp),
ToDate(CONCAT((chararray)CONCAT((chararray)year,
(chararray)month),(chararray)day), 'yyyyMMdd')) AS miliseconds_into_day;
user_days = GROUP aliased_user_times BY (uid, ldap_server,domain, year, month,
day, day_of_week);
times_by_day = FOREACH user_days GENERATE FLATTEN(group) AS (uid, ldap_server,
domain, year, month, day, day_of_week),
MAX(aliased_user_times.miliseconds_into_day) AS max,
MIN(aliased_user_times.miliseconds_into_day) AS min,
MAX(aliased_user_times.long_timestamp) - MIN(aliased_user_times.long_timestamp)
AS time_on;
times_by_day_of_week = GROUP times_by_day BY (uid, ldap_server, domain,
day_of_week);
averages = FOREACH times_by_day_of_week GENERATE FLATTEN(group) AS (uid,
ldap_server, domain, day_of_week), 'USER' as type, AVG(times_by_day.min) AS
start_avg, VAR(times_by_day.min) AS start_var, SQRT(VAR(times_by_day.min)) AS
start_std, AVG(times_by_day.max) AS end_avg, VAR(times_by_day.max) AS end_var,
SQRT(VAR(times_by_day.max)) AS end_std, AVG(times_by_day.time_on) AS hours_avg,
VAR(times_by_day.time_on) AS hours_var, SQRT(VAR(times_by_day.time_on)) AS
hours_std;
Right now we are not calling the script with any additional optimizations or
flags. The job completes when run against one day (each hour is roughly 5 gigs
so a day might be around ~100-150 gigs), but fails horribly against more than 2
days.
What can I try to fix this?