opts.async = (key, limit):
    async urlgrab() with conn limiting.

parallel_wait():
    wait untill all grabs have finished.
---
 urlgrabber/grabber.py |  183 +++++++++++++++++++++++++++++++++++++++++++++++++
 urlgrabber/mirror.py  |   11 +++
 2 files changed, 194 insertions(+), 0 deletions(-)

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index b8574bc..edac07b 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -259,6 +259,12 @@ GENERAL ARGUMENTS (kwargs)
     What type of name to IP resolving to use, default is to do both IPV4 and
     IPV6.
 
+  async = (key, limit)
+
+    When this option is set, the urlgrab() is not processed immediately
+    but queued.  parallel_wait() then processes grabs in parallel, limiting
+    the numer of connections in each 'key' group to at most 'limit'.
+
 
 RETRY RELATED ARGUMENTS
 
@@ -882,6 +888,7 @@ class URLGrabberOptions:
         self.size = None # if we know how big the thing we're getting is going
                          # to be. this is ultimately a MAXIMUM size for the 
file
         self.max_header_size = 2097152 #2mb seems reasonable for maximum 
header size
+        self.async = None # blocking by default
         
     def __repr__(self):
         return self.format()
@@ -1022,6 +1029,15 @@ class URLGrabber(object):
                     _run_callback(opts.checkfunc, obj)
                 return path
         
+        if opts.async:
+            opts.url = url
+            opts.filename = filename
+            opts.size = int(opts.size or 0)
+            key, limit = opts.async
+            limit, queue = _async.setdefault(key, [limit, []])
+            queue.append(opts)
+            return filename
+
         def retryfunc(opts, url, filename):
             fo = PyCurlFileObject(url, filename, opts)
             try:
@@ -1836,6 +1852,173 @@ def retrygrab(url, filename=None, copy_local=0, 
close_connection=0,
 
         
 #####################################################################
+#  Downloader
+#####################################################################
+
+class _AsyncCurlFile(PyCurlFileObject):
+    def _do_open(self):
+        self.curl_obj = pycurl.Curl() # don't reuse _curl_cache
+        self._set_opts()
+        self._do_open_fo() # open the file but don't grab
+
+class _DirectDownloader:
+    def __init__(self):
+        ''' A downloader context.
+        '''
+        self.running = {}
+        self.multi = pycurl.CurlMulti()
+
+    def start(self, opts):
+        ''' Start download of job 'opts'
+        '''
+        fo = _AsyncCurlFile(opts.url, opts.filename, opts)
+        self.running[fo.curl_obj] = fo
+        self.multi.add_handle(fo.curl_obj)
+
+    def perform(self):
+        ''' Run downloads, return finished ones.
+        '''
+        while self.multi.perform()[0] == pycurl.E_CALL_MULTI_PERFORM:
+            pass
+        ret = []
+        _, finished, failed = self.multi.info_read()
+        for curl in finished + failed:
+            curl_err = None
+            if type(curl) == tuple:
+                curl, code, msg = curl
+                curl_err = pycurl.error(code, msg)
+            self.multi.remove_handle(curl)
+            fo = self.running.pop(curl)
+            try: ug_err = None; fo._do_perform_exc(curl_err)
+            except URLGrabError, ug_err: pass
+            fo._do_close_fo()
+            ret.append((fo.opts, ug_err, fo._amount_read))
+        return ret
+
+    def abort(self):
+        while self.running:
+            curl, fo = self.running.popitem()
+            self.multi.remove_handle(curl)
+            fo._do_close_fo()
+
+
+#####################################################################
+#  High level async API
+#####################################################################
+
+_async = {}
+
+def parallel_wait(meter = 'text'):
+    '''Process queued requests in parallel.
+    '''
+
+    if meter:
+        count = total = 0
+        for limit, queue in _async.values():
+            for opts in queue:
+                count += 1
+                total += opts.size
+        if meter == 'text':
+            from progress import TextMultiFileMeter
+            meter = TextMultiFileMeter()
+        meter.start(count, total)
+
+    dl = _DirectDownloader()
+
+    def start(opts, tries):
+        opts.tries = tries
+        opts.progress_obj = meter and meter.newMeter()
+        if DEBUG: DEBUG.info('attempt %i/%s: %s', opts.tries, opts.retry, 
opts.url)
+        dl.start(opts)
+
+    def start_next(opts):
+        key, limit = opts.async
+        pos, queue = _async[key]; _async[key][0] += 1
+        if pos < len(queue):
+            start(queue[pos], 1)
+
+    def perform():
+        for opts, ug_err, _amount_read in dl.perform():
+            if meter:
+                m = opts.progress_obj
+                m.basename = os.path.basename(opts.filename)
+                if ug_err:
+                    m.failure(ug_err.args[1])
+                else:
+                    # file size might have changed
+                    meter.re.total += _amount_read - opts.size
+                    m.end(_amount_read)
+                meter.removeMeter(m)
+
+            if ug_err is None:
+                if DEBUG: DEBUG.info('success')
+                if opts.checkfunc:
+                    try: _run_callback(opts.checkfunc, opts)
+                    except URLGrabError, ug_err:
+                        if meter:
+                            meter.numfiles += 1
+                            meter.re.total += opts.size
+                if ug_err is None:
+                    start_next(opts)
+                    continue
+
+            if DEBUG: DEBUG.info('failure: %s', ug_err)
+            retry = opts.retry or 0
+            if opts.failure_callback:
+                opts.exception = ug_err
+                try: _run_callback(opts.failure_callback, opts)
+                except URLGrabError, ug_err: retry = 0 # no retries
+            if opts.tries < retry and ug_err.args[0] in opts.retrycodes:
+                start(opts, opts.tries + 1) # simple retry
+                continue
+            start_next(opts)
+
+            if hasattr(opts, 'mirror_group'):
+                mg, gr, mirrorchoice = opts.mirror_group
+                opts.exception = ug_err
+                opts.mirror = mirrorchoice['mirror']
+                opts.relative_url = gr.url
+                try:
+                    mg._failure(gr, opts)
+                    mirrorchoice = mg._get_mirror(gr)
+                    opts.mirror_group = mg, gr, mirrorchoice
+                except URLGrabError, ug_err: pass
+                else:
+                    # use new mirrorchoice
+                    key = mirrorchoice['mirror']
+                    kwargs = mirrorchoice.get('kwargs', {})
+                    limit = kwargs.get('max_connections') or 3
+                    opts.async = key, limit
+                    opts.url = mg._join_url(mirrorchoice['mirror'], gr.url)
+
+                    # add request to the new queue
+                    pos, queue = _async.setdefault(key, [limit, []])
+                    queue[pos:pos] = [opts] # inserting at head
+                    if len(queue) <= pos:
+                        start(opts, 1)
+                    continue
+
+            # urlgrab failed
+            opts.exception = ug_err
+            _run_callback(opts.failfunc, opts)
+
+    try:
+        for limit, queue in _async.values():
+            for opts in queue[:limit]:
+                start(opts, 1)
+        # now 'limit' is used as 'pos', index
+        # of the first request not started yet.
+        while dl.running:
+            perform()
+
+    finally:
+        dl.abort()
+        _async.clear()
+        if meter:
+            meter.end()
+
+
+#####################################################################
 #  TESTING
 def _main_test():
     try: url, filename = sys.argv[1:3]
diff --git a/urlgrabber/mirror.py b/urlgrabber/mirror.py
index 575ef7b..41c99f5 100644
--- a/urlgrabber/mirror.py
+++ b/urlgrabber/mirror.py
@@ -76,6 +76,9 @@ CUSTOMIZATION
        'grabber' is omitted, the default grabber will be used.  If
        kwargs are omitted, then (duh) they will not be used.
 
+       kwarg 'max_connections' is used to store the max connection
+       limit of this mirror, and to update the value of 'async' option.
+
     3) Pass keyword arguments when instantiating the mirror group.
        See, for example, the failure_callback argument.
 
@@ -392,6 +395,14 @@ class MirrorGroup:
             grabber = mirrorchoice.get('grabber') or self.grabber
             func_ref = getattr(grabber, func)
             if DEBUG: DEBUG.info('MIRROR: trying %s -> %s', url, fullurl)
+            if kw.get('async'):
+                # 'async' option to the 1st mirror
+                key = mirrorchoice['mirror']
+                limit = kwargs.get('max_connections') or 3
+                kwargs['async'] = key, limit
+                # async code iterates mirrors and calls failfunc
+                kwargs['mirror_group'] = self, gr, mirrorchoice
+                kwargs['failfunc'] = gr.kw.get('failfunc', _do_raise)
             try:
                 return func_ref( *(fullurl,), **kwargs )
             except URLGrabError, e:
-- 
1.7.4.4

_______________________________________________
Yum-devel mailing list
Yum-devel@lists.baseurl.org
http://lists.baseurl.org/mailman/listinfo/yum-devel

Reply via email to