Thanks Alan, I got the sum to work based on your example. Here is the
script with the sum code now working. I am sure there is a better way to do
what I am after.
The script takes key-value log data, and rolls up based on several of the
keys. Being a perl and not java coder, I stream the data through a perl
script to parse the key-values and load them into the pig schema. I then
filter the rows to the desired sub-set, group by the required keys, and then
calculate the group counts. Next I want to filter only groups with count >
100. Here is where I wanted to calculate the sums, as I want to compare the
total sum of all groups, to the sum of the filtered groups. Finally, I
order the filtered groups by count, and store the results.
DEFINE parse_stream `parse_pi_timeframe.pl`
SHIP ('/home/hadoop/pig_test/parse_pi_timeframe.pl');
--A = LOAD '/user/hadoop/test_data_2/ads2x03-1256523357.log' using
PigStorage();
--A = LOAD '/user/hadoop/small_test.in' using PigStorage();
A = LOAD '/user/hadoop/test*/ads*.log' using PigStorage();
B = stream A through parse_stream as (site:chararray, cat:chararray,
tf:chararray, pos:chararray, dma:chararray, state:chararray,
country:chararray);
raw = FILTER B BY cat == 'fcst';
site_group = GROUP raw BY (site,tf,pos,country,state,dma) PARALLEL 8;
count_grouped = FOREACH site_group GENERATE group as g1, COUNT(raw) as
imp_count:long;
filtered_counted = FILTER count_grouped BY imp_count > 100;
sum_group = group count_grouped all PARALLEL 8;
sum_impressions = FOREACH sum_group GENERATE SUM(count_grouped.imp_count);
STORE sum_impressions INTO '/user/hadoop/output/total_impressions.out' USING
PigStorage('|');
filtered_sum_group = group filtered_counted all PARALLEL 8;
sum_filtered = FOREACH filtered_sum_group GENERATE
SUM(filtered_counted.imp_count);
STORE sum_filtered INTO '/user/hadoop/output/filtered_impressions.out' USING
PigStorage('|');
order_group = ORDER filtered_counted by imp_count PARALLEL 8;
STORE order_group INTO '/user/hadoop/output/test_filtered.out' USING
PigStorage('|');