Add a wrapper around the code that touches curl. (preparing for external downloader) --- urlgrabber/grabber.py | 80 +++++++++++++++++++++++++++++------------------- 1 files changed, 48 insertions(+), 32 deletions(-)
diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py index 644b431..b64c943 100644 --- a/urlgrabber/grabber.py +++ b/urlgrabber/grabber.py @@ -1858,6 +1858,47 @@ class _AsyncCurlFile(PyCurlFileObject): self._set_opts() self._do_open_fo() # open the file but don't grab +class _DirectDownloader: + def __init__(self): + ''' A downloader context. + ''' + self.running = {} + self.multi = pycurl.CurlMulti() + + def start(self, opts): + ''' Start download of job 'opts' + ''' + fo = _AsyncCurlFile(opts.url, opts.filename, opts) + self.running[fo.curl_obj] = fo + self.multi.add_handle(fo.curl_obj) + + def perform(self): + ''' Run downloads, return finished ones. + ''' + while self.multi.perform()[0] == pycurl.E_CALL_MULTI_PERFORM: + pass + ret = [] + _, finished, failed = self.multi.info_read() + for curl in finished + failed: + curl_err = None + if type(curl) == tuple: + curl, code, msg = curl + curl_err = pycurl.error(code, msg) + self.multi.remove_handle(curl) + fo = self.running.pop(curl) + try: ug_err = None; fo._do_perform_exc(curl_err) + except URLGrabError, ug_err: pass + fo._do_close_fo() + ret.append((fo.opts, ug_err, fo._amount_read)) + return ret + + def abort(self): + while self.running: + curl, fo = self.running.popitem() + self.multi.remove_handle(curl) + fo._do_close_fo() + os.unlink(fo.opts.filename) + ##################################################################### # High level async API @@ -1880,16 +1921,13 @@ def parallel_wait(meter = 'text'): meter = TextMultiFileMeter() meter.start(count, total) - running = {} - multi = pycurl.CurlMulti() + dl = _DirectDownloader() def start(opts, tries): opts.tries = tries opts.progress_obj = meter and meter.newMeter() if DEBUG: DEBUG.info('attempt %i/%s: %s', opts.tries, opts.retry, opts.url) - fo = _AsyncCurlFile(opts.url, opts.filename, opts) - running[fo.curl_obj] = fo - multi.add_handle(fo.curl_obj) + dl.start(opts) def start_next(opts): key, limit = opts.async @@ -1898,25 +1936,7 @@ def parallel_wait(meter = 'text'): start(queue[pos], 1) def perform(): - while multi.perform()[0] == pycurl.E_CALL_MULTI_PERFORM: - pass - q, finished, failed = multi.info_read() - for curl in finished + failed: - - # curl layer - curl_err = None - if type(curl) == tuple: - curl, code, msg = curl - curl_err = pycurl.error(code, msg) - multi.remove_handle(curl) - - # grabber layer - fo = running.pop(curl); opts = fo.opts - try: ug_err = None; fo._do_perform_exc(curl_err) - except URLGrabError, ug_err: pass - fo._do_close_fo() - - # do progress before callbacks to show retries + for opts, ug_err, _amount_read in dl.perform(): if meter: m = opts.progress_obj m.basename = os.path.basename(opts.filename) @@ -1924,8 +1944,8 @@ def parallel_wait(meter = 'text'): m.failure(ug_err.args[1]) else: # file size might have changed - meter.re.total += fo._amount_read - opts.size - m.end(fo._amount_read) + meter.re.total += _amount_read - opts.size + m.end(_amount_read) meter.removeMeter(m) if ug_err is None: @@ -1987,15 +2007,11 @@ def parallel_wait(meter = 'text'): start(opts, 1) # now 'limit' is used as 'pos', index # of the first request not started yet. - while running: + while dl.running: perform() finally: - while running: - curl, fo = running.popitem() - multi.remove_handle(curl) - fo._do_close_fo() - os.unlink(fo.opts.filename) + dl.abort() _async.clear() if meter: meter.end() -- 1.7.4.4 _______________________________________________ Yum-devel mailing list Yum-devel@lists.baseurl.org http://lists.baseurl.org/mailman/listinfo/yum-devel