opts.async = (key, limit):
async urlgrab() with conn limiting.
parallel_wait():
wait untill all grabs have finished.
---
urlgrabber/grabber.py | 183 +++++++++++++++++++++++++++++++++++++++++++++++++
urlgrabber/mirror.py | 11 +++
2 files changed, 194 insertions(+), 0 deletions(-)
diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index b8574bc..edac07b 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -259,6 +259,12 @@ GENERAL ARGUMENTS (kwargs)
What type of name to IP resolving to use, default is to do both IPV4 and
IPV6.
+ async = (key, limit)
+
+ When this option is set, the urlgrab() is not processed immediately
+ but queued. parallel_wait() then processes grabs in parallel, limiting
+ the numer of connections in each 'key' group to at most 'limit'.
+
RETRY RELATED ARGUMENTS
@@ -882,6 +888,7 @@ class URLGrabberOptions:
self.size = None # if we know how big the thing we're getting is going
# to be. this is ultimately a MAXIMUM size for the
file
self.max_header_size = 2097152 #2mb seems reasonable for maximum
header size
+ self.async = None # blocking by default
def __repr__(self):
return self.format()
@@ -1022,6 +1029,15 @@ class URLGrabber(object):
_run_callback(opts.checkfunc, obj)
return path
+ if opts.async:
+ opts.url = url
+ opts.filename = filename
+ opts.size = int(opts.size or 0)
+ key, limit = opts.async
+ limit, queue = _async.setdefault(key, [limit, []])
+ queue.append(opts)
+ return filename
+
def retryfunc(opts, url, filename):
fo = PyCurlFileObject(url, filename, opts)
try:
@@ -1836,6 +1852,173 @@ def retrygrab(url, filename=None, copy_local=0,
close_connection=0,
#####################################################################
+# Downloader
+#####################################################################
+
+class _AsyncCurlFile(PyCurlFileObject):
+ def _do_open(self):
+ self.curl_obj = pycurl.Curl() # don't reuse _curl_cache
+ self._set_opts()
+ self._do_open_fo() # open the file but don't grab
+
+class _DirectDownloader:
+ def __init__(self):
+ ''' A downloader context.
+ '''
+ self.running = {}
+ self.multi = pycurl.CurlMulti()
+
+ def start(self, opts):
+ ''' Start download of job 'opts'
+ '''
+ fo = _AsyncCurlFile(opts.url, opts.filename, opts)
+ self.running[fo.curl_obj] = fo
+ self.multi.add_handle(fo.curl_obj)
+
+ def perform(self):
+ ''' Run downloads, return finished ones.
+ '''
+ while self.multi.perform()[0] == pycurl.E_CALL_MULTI_PERFORM:
+ pass
+ ret = []
+ _, finished, failed = self.multi.info_read()
+ for curl in finished + failed:
+ curl_err = None
+ if type(curl) == tuple:
+ curl, code, msg = curl
+ curl_err = pycurl.error(code, msg)
+ self.multi.remove_handle(curl)
+ fo = self.running.pop(curl)
+ try: ug_err = None; fo._do_perform_exc(curl_err)
+ except URLGrabError, ug_err: pass
+ fo._do_close_fo()
+ ret.append((fo.opts, ug_err, fo._amount_read))
+ return ret
+
+ def abort(self):
+ while self.running:
+ curl, fo = self.running.popitem()
+ self.multi.remove_handle(curl)
+ fo._do_close_fo()
+
+
+#####################################################################
+# High level async API
+#####################################################################
+
+_async = {}
+
+def parallel_wait(meter = 'text'):
+ '''Process queued requests in parallel.
+ '''
+
+ if meter:
+ count = total = 0
+ for limit, queue in _async.values():
+ for opts in queue:
+ count += 1
+ total += opts.size
+ if meter == 'text':
+ from progress import TextMultiFileMeter
+ meter = TextMultiFileMeter()
+ meter.start(count, total)
+
+ dl = _DirectDownloader()
+
+ def start(opts, tries):
+ opts.tries = tries
+ opts.progress_obj = meter and meter.newMeter()
+ if DEBUG: DEBUG.info('attempt %i/%s: %s', opts.tries, opts.retry,
opts.url)
+ dl.start(opts)
+
+ def start_next(opts):
+ key, limit = opts.async
+ pos, queue = _async[key]; _async[key][0] += 1
+ if pos < len(queue):
+ start(queue[pos], 1)
+
+ def perform():
+ for opts, ug_err, _amount_read in dl.perform():
+ if meter:
+ m = opts.progress_obj
+ m.basename = os.path.basename(opts.filename)
+ if ug_err:
+ m.failure(ug_err.args[1])
+ else:
+ # file size might have changed
+ meter.re.total += _amount_read - opts.size
+ m.end(_amount_read)
+ meter.removeMeter(m)
+
+ if ug_err is None:
+ if DEBUG: DEBUG.info('success')
+ if opts.checkfunc:
+ try: _run_callback(opts.checkfunc, opts)
+ except URLGrabError, ug_err:
+ if meter:
+ meter.numfiles += 1
+ meter.re.total += opts.size
+ if ug_err is None:
+ start_next(opts)
+ continue
+
+ if DEBUG: DEBUG.info('failure: %s', ug_err)
+ retry = opts.retry or 0
+ if opts.failure_callback:
+ opts.exception = ug_err
+ try: _run_callback(opts.failure_callback, opts)
+ except URLGrabError, ug_err: retry = 0 # no retries
+ if opts.tries < retry and ug_err.args[0] in opts.retrycodes:
+ start(opts, opts.tries + 1) # simple retry
+ continue
+ start_next(opts)
+
+ if hasattr(opts, 'mirror_group'):
+ mg, gr, mirrorchoice = opts.mirror_group
+ opts.exception = ug_err
+ opts.mirror = mirrorchoice['mirror']
+ opts.relative_url = gr.url
+ try:
+ mg._failure(gr, opts)
+ mirrorchoice = mg._get_mirror(gr)
+ opts.mirror_group = mg, gr, mirrorchoice
+ except URLGrabError, ug_err: pass
+ else:
+ # use new mirrorchoice
+ key = mirrorchoice['mirror']
+ kwargs = mirrorchoice.get('kwargs', {})
+ limit = kwargs.get('max_connections') or 3
+ opts.async = key, limit
+ opts.url = mg._join_url(mirrorchoice['mirror'], gr.url)
+
+ # add request to the new queue
+ pos, queue = _async.setdefault(key, [limit, []])
+ queue[pos:pos] = [opts] # inserting at head
+ if len(queue) <= pos:
+ start(opts, 1)
+ continue
+
+ # urlgrab failed
+ opts.exception = ug_err
+ _run_callback(opts.failfunc, opts)
+
+ try:
+ for limit, queue in _async.values():
+ for opts in queue[:limit]:
+ start(opts, 1)
+ # now 'limit' is used as 'pos', index
+ # of the first request not started yet.
+ while dl.running:
+ perform()
+
+ finally:
+ dl.abort()
+ _async.clear()
+ if meter:
+ meter.end()
+
+
+#####################################################################
# TESTING
def _main_test():
try: url, filename = sys.argv[1:3]
diff --git a/urlgrabber/mirror.py b/urlgrabber/mirror.py
index 575ef7b..41c99f5 100644
--- a/urlgrabber/mirror.py
+++ b/urlgrabber/mirror.py
@@ -76,6 +76,9 @@ CUSTOMIZATION
'grabber' is omitted, the default grabber will be used. If
kwargs are omitted, then (duh) they will not be used.
+ kwarg 'max_connections' is used to store the max connection
+ limit of this mirror, and to update the value of 'async' option.
+
3) Pass keyword arguments when instantiating the mirror group.
See, for example, the failure_callback argument.
@@ -392,6 +395,14 @@ class MirrorGroup:
grabber = mirrorchoice.get('grabber') or self.grabber
func_ref = getattr(grabber, func)
if DEBUG: DEBUG.info('MIRROR: trying %s -> %s', url, fullurl)
+ if kw.get('async'):
+ # 'async' option to the 1st mirror
+ key = mirrorchoice['mirror']
+ limit = kwargs.get('max_connections') or 3
+ kwargs['async'] = key, limit
+ # async code iterates mirrors and calls failfunc
+ kwargs['mirror_group'] = self, gr, mirrorchoice
+ kwargs['failfunc'] = gr.kw.get('failfunc', _do_raise)
try:
return func_ref( *(fullurl,), **kwargs )
except URLGrabError, e:
--
1.7.4.4
_______________________________________________
Yum-devel mailing list
[email protected]
http://lists.baseurl.org/mailman/listinfo/yum-devel