opts.async = (key, limit): async urlgrab() with conn limiting. parallel_wait(): wait untill all grabs have finished. --- urlgrabber/grabber.py | 183 +++++++++++++++++++++++++++++++++++++++++++++++++ urlgrabber/mirror.py | 11 +++ 2 files changed, 194 insertions(+), 0 deletions(-)
diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py index b8574bc..edac07b 100644 --- a/urlgrabber/grabber.py +++ b/urlgrabber/grabber.py @@ -259,6 +259,12 @@ GENERAL ARGUMENTS (kwargs) What type of name to IP resolving to use, default is to do both IPV4 and IPV6. + async = (key, limit) + + When this option is set, the urlgrab() is not processed immediately + but queued. parallel_wait() then processes grabs in parallel, limiting + the numer of connections in each 'key' group to at most 'limit'. + RETRY RELATED ARGUMENTS @@ -882,6 +888,7 @@ class URLGrabberOptions: self.size = None # if we know how big the thing we're getting is going # to be. this is ultimately a MAXIMUM size for the file self.max_header_size = 2097152 #2mb seems reasonable for maximum header size + self.async = None # blocking by default def __repr__(self): return self.format() @@ -1022,6 +1029,15 @@ class URLGrabber(object): _run_callback(opts.checkfunc, obj) return path + if opts.async: + opts.url = url + opts.filename = filename + opts.size = int(opts.size or 0) + key, limit = opts.async + limit, queue = _async.setdefault(key, [limit, []]) + queue.append(opts) + return filename + def retryfunc(opts, url, filename): fo = PyCurlFileObject(url, filename, opts) try: @@ -1836,6 +1852,173 @@ def retrygrab(url, filename=None, copy_local=0, close_connection=0, ##################################################################### +# Downloader +##################################################################### + +class _AsyncCurlFile(PyCurlFileObject): + def _do_open(self): + self.curl_obj = pycurl.Curl() # don't reuse _curl_cache + self._set_opts() + self._do_open_fo() # open the file but don't grab + +class _DirectDownloader: + def __init__(self): + ''' A downloader context. + ''' + self.running = {} + self.multi = pycurl.CurlMulti() + + def start(self, opts): + ''' Start download of job 'opts' + ''' + fo = _AsyncCurlFile(opts.url, opts.filename, opts) + self.running[fo.curl_obj] = fo + self.multi.add_handle(fo.curl_obj) + + def perform(self): + ''' Run downloads, return finished ones. + ''' + while self.multi.perform()[0] == pycurl.E_CALL_MULTI_PERFORM: + pass + ret = [] + _, finished, failed = self.multi.info_read() + for curl in finished + failed: + curl_err = None + if type(curl) == tuple: + curl, code, msg = curl + curl_err = pycurl.error(code, msg) + self.multi.remove_handle(curl) + fo = self.running.pop(curl) + try: ug_err = None; fo._do_perform_exc(curl_err) + except URLGrabError, ug_err: pass + fo._do_close_fo() + ret.append((fo.opts, ug_err, fo._amount_read)) + return ret + + def abort(self): + while self.running: + curl, fo = self.running.popitem() + self.multi.remove_handle(curl) + fo._do_close_fo() + + +##################################################################### +# High level async API +##################################################################### + +_async = {} + +def parallel_wait(meter = 'text'): + '''Process queued requests in parallel. + ''' + + if meter: + count = total = 0 + for limit, queue in _async.values(): + for opts in queue: + count += 1 + total += opts.size + if meter == 'text': + from progress import TextMultiFileMeter + meter = TextMultiFileMeter() + meter.start(count, total) + + dl = _DirectDownloader() + + def start(opts, tries): + opts.tries = tries + opts.progress_obj = meter and meter.newMeter() + if DEBUG: DEBUG.info('attempt %i/%s: %s', opts.tries, opts.retry, opts.url) + dl.start(opts) + + def start_next(opts): + key, limit = opts.async + pos, queue = _async[key]; _async[key][0] += 1 + if pos < len(queue): + start(queue[pos], 1) + + def perform(): + for opts, ug_err, _amount_read in dl.perform(): + if meter: + m = opts.progress_obj + m.basename = os.path.basename(opts.filename) + if ug_err: + m.failure(ug_err.args[1]) + else: + # file size might have changed + meter.re.total += _amount_read - opts.size + m.end(_amount_read) + meter.removeMeter(m) + + if ug_err is None: + if DEBUG: DEBUG.info('success') + if opts.checkfunc: + try: _run_callback(opts.checkfunc, opts) + except URLGrabError, ug_err: + if meter: + meter.numfiles += 1 + meter.re.total += opts.size + if ug_err is None: + start_next(opts) + continue + + if DEBUG: DEBUG.info('failure: %s', ug_err) + retry = opts.retry or 0 + if opts.failure_callback: + opts.exception = ug_err + try: _run_callback(opts.failure_callback, opts) + except URLGrabError, ug_err: retry = 0 # no retries + if opts.tries < retry and ug_err.args[0] in opts.retrycodes: + start(opts, opts.tries + 1) # simple retry + continue + start_next(opts) + + if hasattr(opts, 'mirror_group'): + mg, gr, mirrorchoice = opts.mirror_group + opts.exception = ug_err + opts.mirror = mirrorchoice['mirror'] + opts.relative_url = gr.url + try: + mg._failure(gr, opts) + mirrorchoice = mg._get_mirror(gr) + opts.mirror_group = mg, gr, mirrorchoice + except URLGrabError, ug_err: pass + else: + # use new mirrorchoice + key = mirrorchoice['mirror'] + kwargs = mirrorchoice.get('kwargs', {}) + limit = kwargs.get('max_connections') or 3 + opts.async = key, limit + opts.url = mg._join_url(mirrorchoice['mirror'], gr.url) + + # add request to the new queue + pos, queue = _async.setdefault(key, [limit, []]) + queue[pos:pos] = [opts] # inserting at head + if len(queue) <= pos: + start(opts, 1) + continue + + # urlgrab failed + opts.exception = ug_err + _run_callback(opts.failfunc, opts) + + try: + for limit, queue in _async.values(): + for opts in queue[:limit]: + start(opts, 1) + # now 'limit' is used as 'pos', index + # of the first request not started yet. + while dl.running: + perform() + + finally: + dl.abort() + _async.clear() + if meter: + meter.end() + + +##################################################################### # TESTING def _main_test(): try: url, filename = sys.argv[1:3] diff --git a/urlgrabber/mirror.py b/urlgrabber/mirror.py index 575ef7b..41c99f5 100644 --- a/urlgrabber/mirror.py +++ b/urlgrabber/mirror.py @@ -76,6 +76,9 @@ CUSTOMIZATION 'grabber' is omitted, the default grabber will be used. If kwargs are omitted, then (duh) they will not be used. + kwarg 'max_connections' is used to store the max connection + limit of this mirror, and to update the value of 'async' option. + 3) Pass keyword arguments when instantiating the mirror group. See, for example, the failure_callback argument. @@ -392,6 +395,14 @@ class MirrorGroup: grabber = mirrorchoice.get('grabber') or self.grabber func_ref = getattr(grabber, func) if DEBUG: DEBUG.info('MIRROR: trying %s -> %s', url, fullurl) + if kw.get('async'): + # 'async' option to the 1st mirror + key = mirrorchoice['mirror'] + limit = kwargs.get('max_connections') or 3 + kwargs['async'] = key, limit + # async code iterates mirrors and calls failfunc + kwargs['mirror_group'] = self, gr, mirrorchoice + kwargs['failfunc'] = gr.kw.get('failfunc', _do_raise) try: return func_ref( *(fullurl,), **kwargs ) except URLGrabError, e: -- 1.7.4.4 _______________________________________________ Yum-devel mailing list Yum-devel@lists.baseurl.org http://lists.baseurl.org/mailman/listinfo/yum-devel