opts.async = (key, limit):
    async urlgrab() with conn limiting.

parallel_wait():
    wait untill all grabs have finished.
---
 urlgrabber/grabber.py |  169 +++++++++++++++++++++++++++++++++++++++++++++++++
 urlgrabber/mirror.py  |    4 +
 2 files changed, 173 insertions(+), 0 deletions(-)

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index a7c847b..644b431 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -259,6 +259,12 @@ GENERAL ARGUMENTS (kwargs)
     What type of name to IP resolving to use, default is to do both IPV4 and
     IPV6.
 
+  async = (key, limit)
+
+    When this option is set, the urlgrab() is not processed immediately
+    but queued.  parallel_wait() then processes grabs in parallel, limiting
+    the numer of connections in each 'key' group to at most 'limit'.
+
 
 RETRY RELATED ARGUMENTS
 
@@ -882,6 +888,7 @@ class URLGrabberOptions:
         self.size = None # if we know how big the thing we're getting is going
                          # to be. this is ultimately a MAXIMUM size for the 
file
         self.max_header_size = 2097152 #2mb seems reasonable for maximum 
header size
+        self.async = None # blocking by default
         
     def __repr__(self):
         return self.format()
@@ -1019,6 +1026,15 @@ class URLGrabber(object):
                     _callback(opts.checkfunc, obj)
                 return path
         
+        if opts.async:
+            opts.url = url
+            opts.filename = filename
+            opts.size = int(opts.size or 0)
+            key, limit = opts.async
+            limit, queue = _async.setdefault(key, [limit, []])
+            queue.append(opts)
+            return filename
+
         def retryfunc(opts, url, filename):
             fo = PyCurlFileObject(url, filename, opts)
             try:
@@ -1833,6 +1849,159 @@ def retrygrab(url, filename=None, copy_local=0, 
close_connection=0,
 
         
 #####################################################################
+#  Downloader
+#####################################################################
+
+class _AsyncCurlFile(PyCurlFileObject):
+    def _do_open(self):
+        self.curl_obj = pycurl.Curl() # don't reuse _curl_cache
+        self._set_opts()
+        self._do_open_fo() # open the file but don't grab
+
+
+#####################################################################
+#  High level async API
+#####################################################################
+
+_async = {}
+
+def parallel_wait(meter = 'text'):
+    '''Process queued requests in parallel.
+    '''
+
+    if meter:
+        count = total = 0
+        for limit, queue in _async.values():
+            for opts in queue:
+                count += 1
+                total += opts.size
+        if meter == 'text':
+            from progress import TextMultiFileMeter
+            meter = TextMultiFileMeter()
+        meter.start(count, total)
+
+    running = {}
+    multi = pycurl.CurlMulti()
+
+    def start(opts, tries):
+        opts.tries = tries
+        opts.progress_obj = meter and meter.newMeter()
+        if DEBUG: DEBUG.info('attempt %i/%s: %s', opts.tries, opts.retry, 
opts.url)
+        fo = _AsyncCurlFile(opts.url, opts.filename, opts)
+        running[fo.curl_obj] = fo
+        multi.add_handle(fo.curl_obj)
+
+    def start_next(opts):
+        key, limit = opts.async
+        pos, queue = _async[key]; _async[key][0] += 1
+        if pos < len(queue):
+            start(queue[pos], 1)
+
+    def perform():
+        while multi.perform()[0] == pycurl.E_CALL_MULTI_PERFORM:
+            pass
+        q, finished, failed = multi.info_read()
+        for curl in finished + failed:
+
+            # curl layer
+            curl_err = None
+            if type(curl) == tuple:
+                curl, code, msg = curl
+                curl_err = pycurl.error(code, msg)
+            multi.remove_handle(curl)
+
+            # grabber layer
+            fo = running.pop(curl); opts = fo.opts
+            try: ug_err = None; fo._do_perform_exc(curl_err)
+            except URLGrabError, ug_err: pass
+            fo._do_close_fo()
+
+            # do progress before callbacks to show retries
+            if meter:
+                m = opts.progress_obj
+                m.basename = os.path.basename(opts.filename)
+                if ug_err:
+                    m.failure(ug_err.args[1])
+                else:
+                    # file size might have changed
+                    meter.re.total += fo._amount_read - opts.size
+                    m.end(fo._amount_read)
+                meter.removeMeter(m)
+
+            if ug_err is None:
+                if DEBUG: DEBUG.info('success')
+                if opts.checkfunc:
+                    try: _callback(opts.checkfunc, opts)
+                    except URLGrabError, ug_err:
+                        if meter:
+                            meter.numfiles += 1
+                            meter.re.total += opts.size
+                if ug_err is None:
+                    start_next(opts)
+                    continue
+
+            if DEBUG: DEBUG.info('failure: %s', ug_err)
+            retry = opts.retry or 0
+            if opts.failure_callback:
+                opts.exception = ug_err
+                try: _callback(opts.failure_callback, opts)
+                except URLGrabError, ug_err: retry = 0 # no retries
+            if opts.tries < retry and ug_err.args[0] in opts.retrycodes:
+                start(opts, opts.tries + 1) # simple retry
+                continue
+            start_next(opts)
+
+            if hasattr(opts, 'mirror_group'):
+                mg, gr, mirrorchoice = opts.mirror_group
+                opts.exception = ug_err
+                opts.mirror = mirrorchoice['mirror']
+                opts.relative_url = gr.url
+                try:
+                    mg._failure(gr, opts)
+                    mirrorchoice = mg._get_mirror(gr)
+                    opts.mirror_group = mg, gr, mirrorchoice
+                except URLGrabError, ug_err: pass
+                else:
+                    # use new mirrorchoice
+                    key = mirrorchoice['mirror']
+                    limit = mirrorchoice.get('max_connections') or 3
+                    opts.async = key, limit
+                    opts.url = mg._join_url(mirrorchoice['mirror'], gr.url)
+
+                    # add request to the new queue
+                    pos, queue = _async.setdefault(key, [limit, []])
+                    queue[pos:pos] = [opts] # inserting at head
+                    if len(queue) <= pos:
+                        start(opts, 1)
+                    continue
+
+            # urlgrab failed
+            if not opts.failfunc:
+                raise ug_err
+            opts.exception = ug_err
+            _callback(opts.failfunc, opts)
+
+    try:
+        for limit, queue in _async.values():
+            for opts in queue[:limit]:
+                start(opts, 1)
+        # now 'limit' is used as 'pos', index
+        # of the first request not started yet.
+        while running:
+            perform()
+
+    finally:
+        while running:
+            curl, fo = running.popitem()
+            multi.remove_handle(curl)
+            fo._do_close_fo()
+            os.unlink(fo.opts.filename)
+        _async.clear()
+        if meter:
+            meter.end()
+
+
+#####################################################################
 #  TESTING
 def _main_test():
     try: url, filename = sys.argv[1:3]
diff --git a/urlgrabber/mirror.py b/urlgrabber/mirror.py
index 5f0120f..47b6f89 100644
--- a/urlgrabber/mirror.py
+++ b/urlgrabber/mirror.py
@@ -396,6 +396,10 @@ class MirrorGroup:
             # - blocking urlgrab() ignores failfunc
             # - async urlgrab() can iterate mirrors
             kwargs['mirror_group'] = self, gr, mirrorchoice
+            if kw.get('async'):
+                key = mirrorchoice['mirror']
+                limit = mirrorchoice.get('max_connections') or 3
+                kwargs['async'] = key, limit
             try:
                 return func_ref( *(fullurl,), **kwargs )
             except URLGrabError, e:
-- 
1.7.4.4

_______________________________________________
Yum-devel mailing list
Yum-devel@lists.baseurl.org
http://lists.baseurl.org/mailman/listinfo/yum-devel

Reply via email to