Hi Yen,
Does the function also implement Algebraic ? In that case it might end up using the algebraic interface of the udf. If your foreach statement has functions that don't implement Accumulator interface, then reduce task won't run in accumulative mode. This is because you are anyway going to load the whole bag into memory.

If the query is using accumulator mode, you would see this log message - INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.AccumulatorOptimizer - Reducer is to run in accumulative mode.


I tried modifying your query to -

stats = FOREACH grpd {
                                  pages = records.page;
                                  GENERATE group.$1 AS host, group.$0 AS
domain, COUNT(pages) AS page_count:long;
};

and ran it disabling the combiner -
bin/pig -Dpig.exec.nocombiner=true -x local -e 'explain -script /tmp/t.pig;'

I was able to verify that it would run Accumulator mode, using the above log message.

Thanks,
Thejas




On 3/13/12 12:22 PM, Yen SYU wrote:
Hi Jon,

Thanks for your reponse! I use pig 0.9.1-snapshot.

I've used FLATTEN instead of $0 and $1, but ACCUM_CALL is still not fired.
Also tried to remove generic type in accumulator but it did not help. :(

Is it easy for you to fire accumulator?

Yen

On Tue, Mar 13, 2012 at 3:06 PM, Jonathan Coveney<[email protected]>wrote:

What version of pig are you using?

just as an experiment in the simple case, can you try doing

GENERATE flatten(group) as (domain,host), ...(the rest)...

shouldn't make a difference, but I think I remember that in some older
versions it did

2012/3/13 Yen SYU<[email protected]>

Hi all,

I just test a very simple pig script as following:

records = LOAD '$input' AS (hash:chararray, domain:chararray,
host:chararray, page:chararray, freq:int);
grpd = GROUP records BY (domain, host);
stats = FOREACH grpd {
                                  hashes = records.hash;
                                  uniq_hashes = DISTINCT hashes;
                                  pages = records.page;
                                  GENERATE group.$1 AS host, group.$0 AS
domain, COUNT(uniq_hashes) AS hash_total:long, PAGE_COUNT(pages) AS
page_count:long, SUM(freq) AS freq:long);
};
STORE stats INTO '$output';

where PAGE_COUNT is a customized UDF implementing Accumulator. I add an
EXEC_CALL and ACCUM_CALL counter in this UDF and it looks that the
accumulate method is never called. Even I tried to remove all other
built-in UDFs and keep the NESTED FOREACH as simple as:

stats = FOREACH grpd {
                                  pages = records.page;
                                  GENERATE group.$1 AS host, group.$0 AS
domain, PAGE_COUNT(pages) AS page_count:long;
};

Anyone idea what's going on behind the scenes?

Thanks,
Yen




Reply via email to