Add a wrapper around the code that touches curl.
(preparing for external downloader)
---
 urlgrabber/grabber.py |   80 +++++++++++++++++++++++++++++-------------------
 1 files changed, 48 insertions(+), 32 deletions(-)

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 644b431..b64c943 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -1858,6 +1858,47 @@ class _AsyncCurlFile(PyCurlFileObject):
         self._set_opts()
         self._do_open_fo() # open the file but don't grab
 
+class _DirectDownloader:
+    def __init__(self):
+        ''' A downloader context.
+        '''
+        self.running = {}
+        self.multi = pycurl.CurlMulti()
+
+    def start(self, opts):
+        ''' Start download of job 'opts'
+        '''
+        fo = _AsyncCurlFile(opts.url, opts.filename, opts)
+        self.running[fo.curl_obj] = fo
+        self.multi.add_handle(fo.curl_obj)
+
+    def perform(self):
+        ''' Run downloads, return finished ones.
+        '''
+        while self.multi.perform()[0] == pycurl.E_CALL_MULTI_PERFORM:
+            pass
+        ret = []
+        _, finished, failed = self.multi.info_read()
+        for curl in finished + failed:
+            curl_err = None
+            if type(curl) == tuple:
+                curl, code, msg = curl
+                curl_err = pycurl.error(code, msg)
+            self.multi.remove_handle(curl)
+            fo = self.running.pop(curl)
+            try: ug_err = None; fo._do_perform_exc(curl_err)
+            except URLGrabError, ug_err: pass
+            fo._do_close_fo()
+            ret.append((fo.opts, ug_err, fo._amount_read))
+        return ret
+
+    def abort(self):
+        while self.running:
+            curl, fo = self.running.popitem()
+            self.multi.remove_handle(curl)
+            fo._do_close_fo()
+            os.unlink(fo.opts.filename)
+
 
 #####################################################################
 #  High level async API
@@ -1880,16 +1921,13 @@ def parallel_wait(meter = 'text'):
             meter = TextMultiFileMeter()
         meter.start(count, total)
 
-    running = {}
-    multi = pycurl.CurlMulti()
+    dl = _DirectDownloader()
 
     def start(opts, tries):
         opts.tries = tries
         opts.progress_obj = meter and meter.newMeter()
         if DEBUG: DEBUG.info('attempt %i/%s: %s', opts.tries, opts.retry, 
opts.url)
-        fo = _AsyncCurlFile(opts.url, opts.filename, opts)
-        running[fo.curl_obj] = fo
-        multi.add_handle(fo.curl_obj)
+        dl.start(opts)
 
     def start_next(opts):
         key, limit = opts.async
@@ -1898,25 +1936,7 @@ def parallel_wait(meter = 'text'):
             start(queue[pos], 1)
 
     def perform():
-        while multi.perform()[0] == pycurl.E_CALL_MULTI_PERFORM:
-            pass
-        q, finished, failed = multi.info_read()
-        for curl in finished + failed:
-
-            # curl layer
-            curl_err = None
-            if type(curl) == tuple:
-                curl, code, msg = curl
-                curl_err = pycurl.error(code, msg)
-            multi.remove_handle(curl)
-
-            # grabber layer
-            fo = running.pop(curl); opts = fo.opts
-            try: ug_err = None; fo._do_perform_exc(curl_err)
-            except URLGrabError, ug_err: pass
-            fo._do_close_fo()
-
-            # do progress before callbacks to show retries
+        for opts, ug_err, _amount_read in dl.perform():
             if meter:
                 m = opts.progress_obj
                 m.basename = os.path.basename(opts.filename)
@@ -1924,8 +1944,8 @@ def parallel_wait(meter = 'text'):
                     m.failure(ug_err.args[1])
                 else:
                     # file size might have changed
-                    meter.re.total += fo._amount_read - opts.size
-                    m.end(fo._amount_read)
+                    meter.re.total += _amount_read - opts.size
+                    m.end(_amount_read)
                 meter.removeMeter(m)
 
             if ug_err is None:
@@ -1987,15 +2007,11 @@ def parallel_wait(meter = 'text'):
                 start(opts, 1)
         # now 'limit' is used as 'pos', index
         # of the first request not started yet.
-        while running:
+        while dl.running:
             perform()
 
     finally:
-        while running:
-            curl, fo = running.popitem()
-            multi.remove_handle(curl)
-            fo._do_close_fo()
-            os.unlink(fo.opts.filename)
+        dl.abort()
         _async.clear()
         if meter:
             meter.end()
-- 
1.7.4.4

_______________________________________________
Yum-devel mailing list
Yum-devel@lists.baseurl.org
http://lists.baseurl.org/mailman/listinfo/yum-devel

Reply via email to