Assuming 1-5 seconds is mainly waiting for IO, using multiple reducers or
mapper might not be suitable since it just takes too many mapper an d
reducer slots. Couple of options:
1. use streaming : you have full control on how many you handle at a time.
Might be tricky to pass url content.
2. a hack: say you want handle 1000 urls at a time, write a simple loader
that extends PigStorage(), where getNext() looks something like :
{ DataBag bag = ...;
for(int i=; i<1000; i++) {
tuple = super.getNext();
if (tuple == null) break;
bag.add(tuple);
}
return bag.size() > 0 ? bag : null;
}
and your UDF handles bag of tuples and returns a bag of tuples.
Raghu.
On Wed, Nov 9, 2011 at 9:12 AM, Jonathan Coveney <[email protected]> wrote:
> I don't get how this would be a win. Let's imagine you have a system that
> you're fully saturating with map tasks, such that you have, say, 50
> available cpus (after task tracker, job tracker, etc) and you send your job
> to 50 mappers...how is this different from 25 mappers with 2 threads
> apiece? I guess it depends on whether or not the 1 to 5 seconds that each
> task is spending blocking on some action. I guess you could enqueue all the
> URL fetches, and then have another thread process that. Either way, the
> semantics for such a UDF would be awkward and run counter to the typical
> m/r use case, imho. However, if you wanted to do something like this (and
> assuming that you want to avoid waiting a bunch for some blocking i/o),
> what you could do would be to make an accumulator UDF, but then do a group
> all. So you do:
>
> customers = LOAD 'hdfs://node1.c.foundation.local/data/customers.csv'
> USING PigStorage(',')
> AS (id:chararray, name:chararray, url:chararray);
>
> fetchResults = FOREACH customers
> GENERATE id, name, url, fetchHttp(url);
>
> fetchResults = foreach (group customers all) generate customers.id,
> customers.name, fetchHttp(customers.url);
>
> this would cause the accumulator to be invoked, and you could just enqueue
> the elements of the input bag that you get and fire up a thread that begins
> fetching, and then once it is empty, begin processing the results of the
> fetch.
>
> note: that's pure theory and I don't know if it would actually be
> performant, but you could do it :)
>
> if you're not waiting on a bunch of IO, though, I don't see the gain. If
> you have 1-5s of actual work to do per url (not just waiting on the results
> of some long operations), then making it asynchronous won't change that.
>
> 2011/11/9 Daan Gerits <[email protected]>
>
> > I expect you are talking about the 1-5 second delay I talked about. What
> I
> > actually meant was that the code within the exec function of the UDF is
> > taking 1 to 5 seconds for each invocation. That's something I cannot
> change
> > since the fetch method is actually doing a lot more than only fetching
> > something. I cannot push the additional logic the fetching is invoking
> > higher since that would break the algorithm.
> >
> >
> > On 09 Nov 2011, at 16:05, Marek Miglinski wrote:
> >
> > > Something is wrong with your calculations UDF, think of something,
> > because I had experience when I needed to calculate efficiency of data
> > sent/downloaded by user, the logic there was too complex and despite that
> > the speed was ~ 0.02s per user which had ~ 500 transactions each, so in
> > overall ~ 0.00004s per tx.
> > >
> > > Example of the code:
> > > userGroup = GROUP recordTx BY user PARALLEL 100;
> > > userFlattened = FOREACH userGroup {
> > > generated = Merge(recordTx);
> > > GENERATE FLATTEN(generated);
> > > }
> > >
> > >
> > > Sincerely,
> > > Marek M.
> > > ________________________________________
> > > From: Daan Gerits [[email protected]]
> > > Sent: Wednesday, November 09, 2011 4:19 PM
> > > To: [email protected]
> > > Subject: Re: Multithreaded UDF
> > >
> > > Hi Marek,
> > >
> > > yes, I have:
> > >
> > > SET default_parallel 50;
> > >
> > > at the top of my script.
> > >
> > > The idea to use the udf is as follows:
> > >
> > > customers = LOAD
> > 'hdfs://node1.c.foundation.local/data/customers.csv'
> > > USING PigStorage(',')
> > > AS (id:chararray, name:chararray, url:chararray);
> > >
> > > fetchResults = FOREACH customers
> > > GENERATE id, name, url, fetchHttp(url);
> > >
> > > ending up with the following data structure:
> > > (id, name, url, {timestamp, content, fetchDuration})
> > >
> > > I am currently not yet using the group since I would like to find a
> > solution without first having to group everything. The workaround for me
> > would be to group everything on a field which I know is unique, that way
> I
> > won't loose the structure of the relation.
> > >
> > > Thanks for the quick reply,
> > >
> > > Daan
> > >
> > > On 09 Nov 2011, at 15:12, Marek Miglinski wrote:
> > >
> > >> Do you use parallels in the GROUP?
> > >> ________________________________________
> > >> From: Daan Gerits [[email protected]]
> > >> Sent: Wednesday, November 09, 2011 3:34 PM
> > >> To: [email protected]
> > >> Subject: Multithreaded UDF
> > >>
> > >> Hello,
> > >>
> > >> First of all, great job creating pig, really a magnificent piece of
> > software.
> > >>
> > >> I do have a few questions about UDFs. I have a dataset with a list of
> > url's I want to fetch. Since an EvalFunc can only process one tuple at a
> > time and the asynchronous abilities of the UDF are deprecated, I can only
> > fetch one url at a time. The problem is that fetching this one url takes
> a
> > reasonable amount of time (1 to 5 seconds, there is a delay built in) so
> > that really slows down the processing. I already converted the UDF into
> an
> > Accumulator but that only seems to get fired after a group by. If would
> be
> > nice to have some kind of Queue UDF which will queue the tuples until a
> > certain amount is reached and than flushes the queue. That way I can add
> > tuples to an internal list and on flush start multiple threads to go
> > through the list of Tuples.
> > >>
> > >> This is a workaround though, since the best solution would be to
> > reintroduce the asynchronous UDF's (in which case I can schedule the
> > threads as the tuples come in)
> > >>
> > >> Any idea's on this? I already saw someone trying almost the same
> thing,
> > but didn't get a definite answer from that one.
> > >>
> > >> An other option is to increase the number of reducer slots on the
> > cluster, but I'm afraid that would mean we overload the nodes in case of
> a
> > heavy reduce phase.
> > >>
> > >> Best Regards,
> > >>
> > >> Daan
> > >
> >
> >
>