I don't get how this would be a win. Let's imagine you have a system that
you're fully saturating with map tasks, such that you have, say, 50
available cpus (after task tracker, job tracker, etc) and you send your job
to 50 mappers...how is this different from 25 mappers with 2 threads
apiece? I guess it depends on whether or not the 1 to 5 seconds that each
task is spending blocking on some action. I guess you could enqueue all the
URL fetches, and then have another thread process that. Either way, the
semantics for such a UDF would be awkward and run counter to the typical
m/r use case, imho. However, if you wanted to do something like this (and
assuming that you want to avoid waiting a bunch for some blocking i/o),
what you could do would be to make an accumulator UDF, but then do a group
all. So you do:
customers = LOAD 'hdfs://node1.c.foundation.local/data/customers.csv'
USING PigStorage(',')
AS (id:chararray, name:chararray, url:chararray);
fetchResults = FOREACH customers
GENERATE id, name, url, fetchHttp(url);
fetchResults = foreach (group customers all) generate customers.id,
customers.name, fetchHttp(customers.url);
this would cause the accumulator to be invoked, and you could just enqueue
the elements of the input bag that you get and fire up a thread that begins
fetching, and then once it is empty, begin processing the results of the
fetch.
note: that's pure theory and I don't know if it would actually be
performant, but you could do it :)
if you're not waiting on a bunch of IO, though, I don't see the gain. If
you have 1-5s of actual work to do per url (not just waiting on the results
of some long operations), then making it asynchronous won't change that.
2011/11/9 Daan Gerits <[email protected]>
> I expect you are talking about the 1-5 second delay I talked about. What I
> actually meant was that the code within the exec function of the UDF is
> taking 1 to 5 seconds for each invocation. That's something I cannot change
> since the fetch method is actually doing a lot more than only fetching
> something. I cannot push the additional logic the fetching is invoking
> higher since that would break the algorithm.
>
>
> On 09 Nov 2011, at 16:05, Marek Miglinski wrote:
>
> > Something is wrong with your calculations UDF, think of something,
> because I had experience when I needed to calculate efficiency of data
> sent/downloaded by user, the logic there was too complex and despite that
> the speed was ~ 0.02s per user which had ~ 500 transactions each, so in
> overall ~ 0.00004s per tx.
> >
> > Example of the code:
> > userGroup = GROUP recordTx BY user PARALLEL 100;
> > userFlattened = FOREACH userGroup {
> > generated = Merge(recordTx);
> > GENERATE FLATTEN(generated);
> > }
> >
> >
> > Sincerely,
> > Marek M.
> > ________________________________________
> > From: Daan Gerits [[email protected]]
> > Sent: Wednesday, November 09, 2011 4:19 PM
> > To: [email protected]
> > Subject: Re: Multithreaded UDF
> >
> > Hi Marek,
> >
> > yes, I have:
> >
> > SET default_parallel 50;
> >
> > at the top of my script.
> >
> > The idea to use the udf is as follows:
> >
> > customers = LOAD
> 'hdfs://node1.c.foundation.local/data/customers.csv'
> > USING PigStorage(',')
> > AS (id:chararray, name:chararray, url:chararray);
> >
> > fetchResults = FOREACH customers
> > GENERATE id, name, url, fetchHttp(url);
> >
> > ending up with the following data structure:
> > (id, name, url, {timestamp, content, fetchDuration})
> >
> > I am currently not yet using the group since I would like to find a
> solution without first having to group everything. The workaround for me
> would be to group everything on a field which I know is unique, that way I
> won't loose the structure of the relation.
> >
> > Thanks for the quick reply,
> >
> > Daan
> >
> > On 09 Nov 2011, at 15:12, Marek Miglinski wrote:
> >
> >> Do you use parallels in the GROUP?
> >> ________________________________________
> >> From: Daan Gerits [[email protected]]
> >> Sent: Wednesday, November 09, 2011 3:34 PM
> >> To: [email protected]
> >> Subject: Multithreaded UDF
> >>
> >> Hello,
> >>
> >> First of all, great job creating pig, really a magnificent piece of
> software.
> >>
> >> I do have a few questions about UDFs. I have a dataset with a list of
> url's I want to fetch. Since an EvalFunc can only process one tuple at a
> time and the asynchronous abilities of the UDF are deprecated, I can only
> fetch one url at a time. The problem is that fetching this one url takes a
> reasonable amount of time (1 to 5 seconds, there is a delay built in) so
> that really slows down the processing. I already converted the UDF into an
> Accumulator but that only seems to get fired after a group by. If would be
> nice to have some kind of Queue UDF which will queue the tuples until a
> certain amount is reached and than flushes the queue. That way I can add
> tuples to an internal list and on flush start multiple threads to go
> through the list of Tuples.
> >>
> >> This is a workaround though, since the best solution would be to
> reintroduce the asynchronous UDF's (in which case I can schedule the
> threads as the tuples come in)
> >>
> >> Any idea's on this? I already saw someone trying almost the same thing,
> but didn't get a definite answer from that one.
> >>
> >> An other option is to increase the number of reducer slots on the
> cluster, but I'm afraid that would mean we overload the nodes in case of a
> heavy reduce phase.
> >>
> >> Best Regards,
> >>
> >> Daan
> >
>
>