Add 'external = True' flag to parallel_wait() to relay download requests to external process. --- urlgrabber/grabber.py | 80 +++++++++++++++++++++++++++++++++++++++++++++++- 1 files changed, 78 insertions(+), 2 deletions(-)
diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py index 6d75c31..953c997 100644 --- a/urlgrabber/grabber.py +++ b/urlgrabber/grabber.py @@ -1951,6 +1951,81 @@ def download_process(): dl.abort() sys.exit(0) +import subprocess + +class _ExternalDownloader: + def __init__(self): + self.popen = subprocess.Popen( + ['/usr/bin/python', __file__, 'DOWNLOADER'], + stdin = subprocess.PIPE, + stdout = subprocess.PIPE, + ) + self.stdin = self.popen.stdin.fileno() + self.stdout = self.popen.stdout.fileno() + self.running = {} + self.cnt = 0 + + # list of options we pass to downloader + _options = ( + 'url', 'filename', + 'timeout', 'close_connection', 'keepalive', + 'throttle', 'bandwidth', 'range', 'reget', + 'user_agent', 'http_headers', 'ftp_headers', + 'proxies', 'prefix', 'quote', + 'username', 'password', + 'ssl_ca_cert', + 'ssl_cert', 'ssl_cert_type', + 'ssl_key', 'ssl_key_type', + 'ssl_key_pass', + 'ssl_verify_peer', 'ssl_verify_host', + 'size', 'max_header_size', 'ip_resolve', + ) + + def start(self, opts): + arg = [] + for k in self._options: + v = getattr(opts, k) + if v is None: continue + v = simplejson.dumps(v) + v = urllib.quote(v) + arg.append('%s=%s' % (k, v)) + arg = ' '.join(arg) + if DEBUG: DEBUG.info('external: %s', arg) + + self.cnt += 1 + self.running[self.cnt] = opts + os.write(self.stdin, arg +'\n') + + def perform(self): + ret = [] + buf = os.read(self.stdout, 4096) + while buf: + try: line, buf = buf.split('\n', 1) + except ValueError: + buf += os.read(self.stdout, 4096) + continue + # parse downloader output + line = line.split(' ', 3) + cnt, _amount_read = map(int, line[:2]) + if len(line) == 2: + m = self.running[cnt].progress_obj + if m: + if not m.last_update_time: m.start() + m.update(_amount_read) + continue + # job done + opts = self.running.pop(cnt) + err = None + if line[2] != 'OK': + err = URLGrabError(int(line[2]), line[3]) + ret.append((opts, err, _amount_read)) + return ret + + def abort(self): + self.popen.stdin.close() + self.popen.stdout.close() + self.popen.wait() + ##################################################################### # High level async API @@ -1958,7 +2033,7 @@ def download_process(): _async = {} -def parallel_wait(meter = 'text'): +def parallel_wait(meter = 'text', external = True): '''Process queued requests in parallel. ''' @@ -1973,7 +2048,8 @@ def parallel_wait(meter = 'text'): meter = TextMultiFileMeter() meter.start(count, total) - dl = _DirectDownloader() + if external: dl = _ExternalDownloader() + else: dl = _DirectDownloader() def start(opts, tries): opts.tries = tries -- 1.7.4.4 _______________________________________________ Yum-devel mailing list Yum-devel@lists.baseurl.org http://lists.baseurl.org/mailman/listinfo/yum-devel