default_grabber.opts.parallel: concurrent connections limit. 0 = always use blocking urlgrab()
parallel_begin(): start queueing grab requests parallel_end(): process queue in parallel --- urlgrabber/grabber.py | 171 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 171 insertions(+), 0 deletions(-) diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py index 303428c..f0f4c6b 100644 --- a/urlgrabber/grabber.py +++ b/urlgrabber/grabber.py @@ -872,6 +872,7 @@ class URLGrabberOptions: self.size = None # if we know how big the thing we're getting is going # to be. this is ultimately a MAXIMUM size for the file self.max_header_size = 2097152 #2mb seems reasonable for maximum header size + self.parallel = 5 # max connections for parallel grabs def __repr__(self): return self.format() @@ -1008,6 +1009,13 @@ class URLGrabber(object): apply(cb_func, (obj, )+cb_args, cb_kwargs) return path + if _async_on: + opts.url = url + opts.filename = filename + opts.size = int(opts.size or 0) + _async_list.append(opts) + return filename + def retryfunc(opts, url, filename): fo = PyCurlFileObject(url, filename, opts) try: @@ -1827,6 +1835,169 @@ def retrygrab(url, filename=None, copy_local=0, close_connection=0, ##################################################################### +# Helpers +##################################################################### + +class _AsyncCurlFile(PyCurlFileObject): + def _do_open(self): + self.curl_obj = pycurl.Curl() # don't reuse _curl_cache + self._set_opts() + self._do_open_fo() # open the file but don't grab + +def _callback(cb, obj): + if callable(cb): + return cb(obj) + cb, arg, karg = cb + return cb(obj, *arg, **karg) + +##################################################################### +# High level async API +##################################################################### + +_async_on = False +_async_list = [] + +def parallel_begin(): + '''Start queuing urlgrab() requests. + ''' + + if default_grabber.opts.parallel == 0: + return + + global _async_on + assert _async_on == False + _async_on = True + +def parallel_end(meter = 'text'): + '''Process queued requests in parallel. + ''' + + if default_grabber.opts.parallel == 0: + return + + global _async_on + assert _async_on == True + _async_on = False + + global _async_list + if not _async_list: return + todo = _async_list; _async_list = [] + limit = default_grabber.opts.parallel + + if meter: + total = 0 + for opts in todo: + total += opts.size + if meter == 'text': + from progress import TextMultiFileMeter + meter = TextMultiFileMeter() + meter.start(len(todo), total) + + running = {} + multi = pycurl.CurlMulti() + + def start(opts, tries): + opts.tries = tries + if DEBUG: DEBUG.info('attempt %i/%s: %s', opts.tries, opts.retry, opts.url) + opts.progress_obj = meter and meter.newMeter() + fo = _AsyncCurlFile(opts.url, opts.filename, opts) + running[fo.curl_obj] = fo + multi.add_handle(fo.curl_obj) + + def perform(): + while multi.perform()[0] == pycurl.E_CALL_MULTI_PERFORM: + pass + q, finished, failed = multi.info_read() + for curl in finished + failed: + + # curl layer + curl_err = None + if type(curl) == tuple: + curl, code, msg = curl + curl_err = pycurl.error(code, msg) + multi.remove_handle(curl) + + # grabber layer + fo = running.pop(curl); opts = fo.opts + try: ug_err = None; fo._do_perform_exc(curl_err) + except URLGrabError, ug_err: pass + fo._do_close_fo() + + # do progress before callbacks to show retries + if meter: + m = opts.progress_obj + if ug_err: + # progress_obj might not have start()ed yet + m.basename = fo._prog_basename + m.failure(ug_err.args[1]) + else: + # file size might have changed + meter.re.total += fo._amount_read - opts.size + m.end(fo._amount_read) + meter.removeMeter(m) + + if ug_err is None: + if DEBUG: DEBUG.info('success') + if opts.checkfunc: + try: _callback(opts.checkfunc, opts) + except URLGrabError, ug_err: + if meter: + meter.numfiles += 1 + meter.re.total += opts.size + if ug_err is None: + # download & checkfunc: OK + continue + + if DEBUG: DEBUG.info('failure: %s', ug_err) + retry = opts.retry or 0 + if opts.failure_callback: + opts.exception = ug_err + try: _callback(opts.failure_callback, opts) + except URLGrabError, ug_err: retry = 0 # no retries + if opts.tries < retry and ug_err.args[0] in opts.retrycodes: + # simple retry + start(opts, opts.tries + 1) + continue + + if hasattr(opts, 'mirror_group'): + mg, gr, mirrorchoice = opts.mirror_group + opts.exception = ug_err + opts.mirror = mirrorchoice['mirror'] + opts.relative_url = gr.url + try: + mg._failure(gr, opts) + mirrorchoice = mg._get_mirror(gr) + opts.url = mg._join_url(mirrorchoice['mirror'], gr.url) + opts.mirror_group = mg, gr, mirrorchoice + # retry next mirror + start(opts, 1) + continue + except URLGrabError, ug_err: pass + + # urlgrab failed + if not hasattr(opts, 'failfunc'): raise ug_err + opts.exception = ug_err + _callback(opts.failfunc, opts) + + try: + for opts in todo: + start(opts, 1) + while len(running) >= limit: + perform() + while running: + perform() + + finally: + while running: + curl, fo = running.popitem() + multi.remove_handle(curl) + fo._do_close_fo() + os.unlink(fo.opts.filename) + + if meter: + meter.end() + +##################################################################### # TESTING def _main_test(): try: url, filename = sys.argv[1:3] -- 1.7.4.4 _______________________________________________ Yum-devel mailing list Yum-devel@lists.baseurl.org http://lists.baseurl.org/mailman/listinfo/yum-devel