opts.async = (key, limit): async urlgrab() with conn limiting. parallel_wait(): wait untill all grabs have finished. --- urlgrabber/grabber.py | 169 +++++++++++++++++++++++++++++++++++++++++++++++++ urlgrabber/mirror.py | 4 + 2 files changed, 173 insertions(+), 0 deletions(-)
diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py index a7c847b..644b431 100644 --- a/urlgrabber/grabber.py +++ b/urlgrabber/grabber.py @@ -259,6 +259,12 @@ GENERAL ARGUMENTS (kwargs) What type of name to IP resolving to use, default is to do both IPV4 and IPV6. + async = (key, limit) + + When this option is set, the urlgrab() is not processed immediately + but queued. parallel_wait() then processes grabs in parallel, limiting + the numer of connections in each 'key' group to at most 'limit'. + RETRY RELATED ARGUMENTS @@ -882,6 +888,7 @@ class URLGrabberOptions: self.size = None # if we know how big the thing we're getting is going # to be. this is ultimately a MAXIMUM size for the file self.max_header_size = 2097152 #2mb seems reasonable for maximum header size + self.async = None # blocking by default def __repr__(self): return self.format() @@ -1019,6 +1026,15 @@ class URLGrabber(object): _callback(opts.checkfunc, obj) return path + if opts.async: + opts.url = url + opts.filename = filename + opts.size = int(opts.size or 0) + key, limit = opts.async + limit, queue = _async.setdefault(key, [limit, []]) + queue.append(opts) + return filename + def retryfunc(opts, url, filename): fo = PyCurlFileObject(url, filename, opts) try: @@ -1833,6 +1849,159 @@ def retrygrab(url, filename=None, copy_local=0, close_connection=0, ##################################################################### +# Downloader +##################################################################### + +class _AsyncCurlFile(PyCurlFileObject): + def _do_open(self): + self.curl_obj = pycurl.Curl() # don't reuse _curl_cache + self._set_opts() + self._do_open_fo() # open the file but don't grab + + +##################################################################### +# High level async API +##################################################################### + +_async = {} + +def parallel_wait(meter = 'text'): + '''Process queued requests in parallel. + ''' + + if meter: + count = total = 0 + for limit, queue in _async.values(): + for opts in queue: + count += 1 + total += opts.size + if meter == 'text': + from progress import TextMultiFileMeter + meter = TextMultiFileMeter() + meter.start(count, total) + + running = {} + multi = pycurl.CurlMulti() + + def start(opts, tries): + opts.tries = tries + opts.progress_obj = meter and meter.newMeter() + if DEBUG: DEBUG.info('attempt %i/%s: %s', opts.tries, opts.retry, opts.url) + fo = _AsyncCurlFile(opts.url, opts.filename, opts) + running[fo.curl_obj] = fo + multi.add_handle(fo.curl_obj) + + def start_next(opts): + key, limit = opts.async + pos, queue = _async[key]; _async[key][0] += 1 + if pos < len(queue): + start(queue[pos], 1) + + def perform(): + while multi.perform()[0] == pycurl.E_CALL_MULTI_PERFORM: + pass + q, finished, failed = multi.info_read() + for curl in finished + failed: + + # curl layer + curl_err = None + if type(curl) == tuple: + curl, code, msg = curl + curl_err = pycurl.error(code, msg) + multi.remove_handle(curl) + + # grabber layer + fo = running.pop(curl); opts = fo.opts + try: ug_err = None; fo._do_perform_exc(curl_err) + except URLGrabError, ug_err: pass + fo._do_close_fo() + + # do progress before callbacks to show retries + if meter: + m = opts.progress_obj + m.basename = os.path.basename(opts.filename) + if ug_err: + m.failure(ug_err.args[1]) + else: + # file size might have changed + meter.re.total += fo._amount_read - opts.size + m.end(fo._amount_read) + meter.removeMeter(m) + + if ug_err is None: + if DEBUG: DEBUG.info('success') + if opts.checkfunc: + try: _callback(opts.checkfunc, opts) + except URLGrabError, ug_err: + if meter: + meter.numfiles += 1 + meter.re.total += opts.size + if ug_err is None: + start_next(opts) + continue + + if DEBUG: DEBUG.info('failure: %s', ug_err) + retry = opts.retry or 0 + if opts.failure_callback: + opts.exception = ug_err + try: _callback(opts.failure_callback, opts) + except URLGrabError, ug_err: retry = 0 # no retries + if opts.tries < retry and ug_err.args[0] in opts.retrycodes: + start(opts, opts.tries + 1) # simple retry + continue + start_next(opts) + + if hasattr(opts, 'mirror_group'): + mg, gr, mirrorchoice = opts.mirror_group + opts.exception = ug_err + opts.mirror = mirrorchoice['mirror'] + opts.relative_url = gr.url + try: + mg._failure(gr, opts) + mirrorchoice = mg._get_mirror(gr) + opts.mirror_group = mg, gr, mirrorchoice + except URLGrabError, ug_err: pass + else: + # use new mirrorchoice + key = mirrorchoice['mirror'] + limit = mirrorchoice.get('max_connections') or 3 + opts.async = key, limit + opts.url = mg._join_url(mirrorchoice['mirror'], gr.url) + + # add request to the new queue + pos, queue = _async.setdefault(key, [limit, []]) + queue[pos:pos] = [opts] # inserting at head + if len(queue) <= pos: + start(opts, 1) + continue + + # urlgrab failed + if not opts.failfunc: + raise ug_err + opts.exception = ug_err + _callback(opts.failfunc, opts) + + try: + for limit, queue in _async.values(): + for opts in queue[:limit]: + start(opts, 1) + # now 'limit' is used as 'pos', index + # of the first request not started yet. + while running: + perform() + + finally: + while running: + curl, fo = running.popitem() + multi.remove_handle(curl) + fo._do_close_fo() + os.unlink(fo.opts.filename) + _async.clear() + if meter: + meter.end() + + +##################################################################### # TESTING def _main_test(): try: url, filename = sys.argv[1:3] diff --git a/urlgrabber/mirror.py b/urlgrabber/mirror.py index 5f0120f..47b6f89 100644 --- a/urlgrabber/mirror.py +++ b/urlgrabber/mirror.py @@ -396,6 +396,10 @@ class MirrorGroup: # - blocking urlgrab() ignores failfunc # - async urlgrab() can iterate mirrors kwargs['mirror_group'] = self, gr, mirrorchoice + if kw.get('async'): + key = mirrorchoice['mirror'] + limit = mirrorchoice.get('max_connections') or 3 + kwargs['async'] = key, limit try: return func_ref( *(fullurl,), **kwargs ) except URLGrabError, e: -- 1.7.4.4 _______________________________________________ Yum-devel mailing list Yum-devel@lists.baseurl.org http://lists.baseurl.org/mailman/listinfo/yum-devel