Hey everyone,
Thank you for the massive amount of feedback. Allow me to provide my own:
The 1 to 5 seconds is actually just idling, not doing anything actually except
waiting for a random (between 1 and 5 secs) amount of time. It's part of some
human behavior mimicking I do.
I was also thinking about streaming, but haven't looked into it yet since is
seems to me to be pretty low-level, parsing stdin and posting to stdout. And as
you say, I don't know how easy it would be to send urls and content through the
streams without too much of a hassle. Maybe it's an option to use a
serialization mechanism like avro or plain java serialization to pass the
information. Anyway that's the backup plan, if all else fails :)
Actually my first idea was to create a loader to load the content from the web,
but that seemed not very ideal since I want to use the udf not only during
loading, but during a foreach. Maybe I'm getting this wrong, but you can't pass
relations to the constructor of a load function? or can you?
As I'm wandering on, I have something which is nearly working:
queries = FOREACH customers GENERATE id as key, url;
requests = GROUP queries BY SUBSTRING(url, 0, 13);
fetchResults = FOREACH requests {
results = fetchHttp(queries);
GENERATE FLATTEN(results);
}
results = FOREACH fetchResults GENERATE now() as timestamp, url,
FLATTEN(fetches);
cleanedResults = FOREACH results GENERATE timestamp, url, page as page,
duration as duration, removeWhiteSpace(content, 0) as content;
Any idea's on this approach?
Daan.
On 09 Nov 2011, at 19:54, Alan Gates wrote:
> Multi-threading of UDFs is not deprecated, it just isn't explicitly
> supported. However, it should work. The internal MonitoredUDF uses multiple
> threads.
>
> Do you need to output records conditionally or modify the contents of the
> record based on the results of this http call? If not, then you can place
> records in a queue as they go through and have a pool of worker threads doing
> the http calls in the background. You can then use the finish() call to make
> sure your queue is empty and all your work threads finished.
>
> The problem if you need to modify or remove records is that finish() doesn't
> let you return data. So even though you could return a bag full of records
> you had finished for each record that came in (with some bags being empty,
> which a subsequent flatten could then remove), you would loose the last few
> records because you wouldn't get a chance to return them. As suggested in a
> previous mail, streaming will do what you want in this case.
>
> Alan.
>
>
> On Nov 9, 2011, at 5:34 AM, Daan Gerits wrote:
>
>> Hello,
>>
>> First of all, great job creating pig, really a magnificent piece of software.
>>
>> I do have a few questions about UDFs. I have a dataset with a list of url's
>> I want to fetch. Since an EvalFunc can only process one tuple at a time and
>> the asynchronous abilities of the UDF are deprecated, I can only fetch one
>> url at a time. The problem is that fetching this one url takes a reasonable
>> amount of time (1 to 5 seconds, there is a delay built in) so that really
>> slows down the processing. I already converted the UDF into an Accumulator
>> but that only seems to get fired after a group by. If would be nice to have
>> some kind of Queue UDF which will queue the tuples until a certain amount is
>> reached and than flushes the queue. That way I can add tuples to an internal
>> list and on flush start multiple threads to go through the list of Tuples.
>>
>> This is a workaround though, since the best solution would be to reintroduce
>> the asynchronous UDF's (in which case I can schedule the threads as the
>> tuples come in)
>>
>> Any idea's on this? I already saw someone trying almost the same thing, but
>> didn't get a definite answer from that one.
>>
>> An other option is to increase the number of reducer slots on the cluster,
>> but I'm afraid that would mean we overload the nodes in case of a heavy
>> reduce phase.
>>
>> Best Regards,
>>
>> Daan
>