Zooko O'Whielacronx has proposed merging lp:~dkeeney/txaws/add-multipart-upload into lp:txaws.
Requested reviews: txAWS Developers (txaws-dev) Related bugs: Bug #783801 in txAWS: "s3 multipart upload support" https://bugs.launchpad.net/txaws/+bug/783801 For more details, see: https://code.launchpad.net/~dkeeney/txaws/add-multipart-upload/+merge/76647 Okay, I read the patches on this branch: lp:~dkeeney/txaws/add-multipart-upload . The only problem I noticed is that the most recent changes -- http://bazaar.launchpad.net/~dkeeney/txaws/add-multipart-upload/revision/77 , http://bazaar.launchpad.net/~dkeeney/txaws/add-multipart-upload/revision/78 , and http://bazaar.launchpad.net/~dkeeney/txaws/add-multipart-upload/revision/79 -- don't have accompanying unit tests. It looks like to me from perusing the patches that this should be sufficient to support multipart upload to S3. -- https://code.launchpad.net/~dkeeney/txaws/add-multipart-upload/+merge/76647 Your team txAWS Developers is requested to review the proposed merge of lp:~dkeeney/txaws/add-multipart-upload into lp:txaws.
=== added file 'bin/txaws-delete-upload' --- bin/txaws-delete-upload 1970-01-01 00:00:00 +0000 +++ bin/txaws-delete-upload 2011-09-22 20:23:28 +0000 @@ -0,0 +1,45 @@ +#!/usr/bin/env python +""" +%prog [options] +""" + +import sys + +from txaws.credentials import AWSCredentials +from txaws.script import parse_options +from txaws.service import AWSServiceRegion +from txaws.reactor import reactor + +def printResults(results): + #print results + return 0 + +def printError(error): + print error.value + return 1 + +def finish(return_code): + reactor.stop(exitStatus=return_code) + +options, args = parse_options(__doc__.strip()) +if options.bucket is None: + print "Error Message: A bucket name is required." + sys.exit(1) +elif options.object_name is None: + print "Error Message: An object name is required." + sys.exit(1) +elif options.uploadid is None: + print "Error Message: An uploadId (-u) is required." + sys.exit(1) +creds = AWSCredentials(options.access_key, options.secret_key) +region = AWSServiceRegion( + creds=creds, region=options.region, s3_uri=options.url) +client = region.get_s3_client() + +d = client.abort_object(options.bucket, options.object_name, options.uploadid) +d.addCallback(printResults) +d.addErrback(printError) +d.addCallback(finish) +# We use a custom reactor so that we can return the exit status from +# reactor.run(). +sys.exit(reactor.run()) === added file 'bin/txaws-list-uploads' --- bin/txaws-list-uploads 1970-01-01 00:00:00 +0000 +++ bin/txaws-list-uploads 2011-09-22 20:23:28 +0000 @@ -0,0 +1,40 @@ +#!/usr/bin/env python +""" +%prog [options] +""" + +import sys + +from txaws.credentials import AWSCredentials +from txaws.script import parse_options +from txaws.service import AWSServiceRegion +from txaws.reactor import reactor + + +def printResults(results): + print "\nUploads:" + for upload in results: + print "%s\n %s (%s)" %(upload[0],upload[2],upload[1]) + print "Total uploads: %s\n" % len(list(results)) + return 0 + +def printError(error): + print error.value + return 1 + +def finish(return_code): + reactor.stop(exitStatus=return_code) + +options, args = parse_options(__doc__.strip()) +creds = AWSCredentials(options.access_key, options.secret_key) +region = AWSServiceRegion( + creds=creds, region=options.region, s3_uri=options.url) +client = region.get_s3_client() + +d = client.list_mpuploads(options.bucket) +d.addCallback(printResults) +d.addErrback(printError) +d.addCallback(finish) +# We use a custom reactor so that we can return the exit status from +# reactor.run(). +sys.exit(reactor.run()) === added file 'bin/txaws-post-upload' --- bin/txaws-post-upload 1970-01-01 00:00:00 +0000 +++ bin/txaws-post-upload 2011-09-22 20:23:28 +0000 @@ -0,0 +1,86 @@ +#!/usr/bin/env python +""" +%prog [options] +""" + +import os +import sys +import StringIO + +from txaws.credentials import AWSCredentials +from txaws.script import parse_options +from txaws.service import AWSServiceRegion +from txaws.reactor import reactor +from txaws.s3.multipart import MultipartManager + +def printResults(results): + print results + print 'next part num:', mgr.partNumber() + return 0 + +def printError(error): + print error.value + return 1 + +def finish(return_code): + reactor.stop(exitStatus=return_code) + + +options, args = parse_options(__doc__.strip()) +if options.bucket is None: + print "Error Message: A bucket name is required." + sys.exit(1) + +filename = options.object_filename +if filename: + options.object_name = os.path.basename(filename) + try: + options.object_data = open(filename,'rb') + except Exception, error: + print error + sys.exit(1) +elif options.object_name is None: + print "Error Message: An object name is required." + sys.exit(1) +else: + # turn input data into file-like obj + options.object_data = StringIO.StringIO(options.object_data) + options.object_data.seek(0) + +creds = AWSCredentials(options.access_key, options.secret_key) +region = AWSServiceRegion(creds=creds, region=options.region, + s3_uri=options.url) +client = region.get_s3_client() + +mgr = MultipartManager(client, options.bucket, options.object_name, + options.content_type) + +# send first block of data +def startdata(uploadId): + if uploadId: + senddata() + else: + print 'uploadId not obtained' + reactor.stop(exitStatus=1) + +# send each block of data +def senddata(ign=None): + tosend = options.object_data.read(6000000) + if tosend: + d = mgr.send_part(tosend) + d.addErrback(printError) + d.addCallback(senddata) + else: + d = mgr.finish() + d.addCallback(printResults) + d.addCallback(finish) + d.addErrback(printError) + +# start init +d = mgr.initialize() +d.addCallback(startdata) +d.addErrback(printError) + +# We use a custom reactor so that we can return the exit status from +# reactor.run(). +sys.exit(reactor.run()) === modified file 'bin/txaws-put-object' --- bin/txaws-put-object 2011-04-13 03:23:49 +0000 +++ bin/txaws-put-object 2011-09-22 20:23:28 +0000 @@ -33,7 +33,7 @@ if filename: options.object_name = os.path.basename(filename) try: - options.object_data = open(filename).read() + options.object_data = open(filename,'rb').read() except Exception, error: print error sys.exit(1) === modified file 'txaws/client/base.py' --- txaws/client/base.py 2011-04-21 21:16:37 +0000 +++ txaws/client/base.py 2011-09-22 20:23:28 +0000 @@ -1,7 +1,12 @@ +<<<<<<< TREE try: from xml.etree.ElementTree import ParseError except ImportError: from xml.parsers.expat import ExpatError as ParseError +======= + +from xml.parsers.expat import ExpatError +>>>>>>> MERGE-SOURCE from twisted.internet import reactor, ssl from twisted.web import http @@ -91,6 +96,7 @@ """ contextFactory = None scheme, host, port, path = parse(url) + #print >> sys.stderr, 'gp:'+str(path) self.client = self.factory(url, *args, **kwds) if scheme == 'https': contextFactory = ssl.ClientContextFactory() === modified file 'txaws/s3/client.py' --- txaws/s3/client.py 2011-04-14 19:50:30 +0000 +++ txaws/s3/client.py 2011-09-22 20:23:28 +0000 @@ -12,8 +12,13 @@ functionality in this wrapper. """ import mimetypes +import time from twisted.web.http import datetimeToString +from twisted.web2 import stream, http_headers +from twisted.web2 import http as web2_http +from twisted.web2.client import http as web2_client_http +from twisted.internet import protocol, reactor, ssl from epsilon.extime import Time @@ -23,11 +28,14 @@ Bucket, BucketItem, BucketListing, ItemOwner, RequestPayment) from txaws.s3.exception import S3Error from txaws.service import AWSServiceEndpoint, S3_ENDPOINT -from txaws.util import XML, calculate_md5 +from txaws.util import XML, calculate_md5, parse def s3_error_wrapper(error): - error_wrapper(error, S3Error) + print '\n' + print str(error.__dict__) + print str(error) + #error_wrapper(error, S3Error) class URLContext(object): @@ -64,9 +72,15 @@ class S3Client(BaseClient): """A client for S3.""" - def __init__(self, creds=None, endpoint=None, query_factory=None): + def __init__(self, creds=None, endpoint=None, query_factory=None, streaming_query_factory=None): if query_factory is None: query_factory = Query + + if streaming_query_factory is None: + streaming_query_factory = StreamingQuery + + self.streaming_query_factory = streaming_query_factory + super(S3Client, self).__init__(creds, endpoint, query_factory) def list_buckets(self): @@ -257,6 +271,15 @@ bucket=bucket, object_name=object_name) return query.submit() + def get_object_stream(self, bucket, object_name): + """ + Get an object from a bucket. + """ + query = self.streaming_query_factory( + action="GET", creds=self.creds, endpoint=self.endpoint, + bucket=bucket, object_name=object_name) + return query.submit() + def head_object(self, bucket, object_name): """ Retrieve object metadata only. @@ -278,6 +301,7 @@ bucket=bucket, object_name=object_name) return query.submit() +<<<<<<< TREE def get_object_acl(self, bucket, object_name): """ Get the access control policy for an object. @@ -320,6 +344,140 @@ """ return RequestPayment.from_xml(xml_bytes).payer +======= + def post_object_init(self, bucket, object_name, content_type=None, + metadata={}): + """ + Starts a multipart upload to a bucket. + Any existing object of the same name will be replaced. + + returns a string uploadId + """ + objectname_plus = object_name+'?uploads' + query = self.query_factory( + action="POST", creds=self.creds, endpoint=self.endpoint, + bucket=bucket, object_name=objectname_plus, data='', + content_type=content_type, metadata=metadata) + d = query.submit() + return d.addCallback(self._parse_post_init) + + def _parse_post_init(self, xml_bytes): + """parse the response to post_object_init""" + root = XML(xml_bytes) + uploadId = root.findtext("UploadId") + return uploadId + + def put_object_part(self,bucket,object_name,uploadId,data,partNumber, + content_type=None,metadata={}): + """ + Continues a multipart upload to a bucket. + Sends another slug of data + + returns deferred from query.submit + """ + assert(uploadId,'start multipart upload with a .post_object_init') + # this is backwards from docs, but it works + parms = 'partNumber='+str(partNumber)+'&uploadId='+uploadId + objectname_plus = object_name+'?'+parms + query = self.query_factory( + action="PUT", creds=self.creds, endpoint=self.endpoint, + bucket=bucket, object_name=objectname_plus, data=data, + content_type=content_type, metadata=metadata) + d = query.submit() + d.addCallback(query.get_response_headers) + return d + + def abort_object(self,bucket,object_name,uploadId, + content_type=None,metadata={}): + """ + Aborts a multipart upload to a bucket. + + returns deferred from query.submit + """ + assert(uploadId,'cannot abort an unidentified upload') + objectname_plus = object_name+'?uploadId='+uploadId + query = self.query_factory( + action="DELETE", creds=self.creds, endpoint=self.endpoint, + bucket=bucket, object_name=objectname_plus, data='', + content_type=content_type, metadata=metadata) + d = query.submit() + return d + + def post_object_finish(self, bucket, object_name, uploadId, partsList, + content_type=None, metadata={}): + """ + Completes the multipart upload. + + Can be slow, returns deferred + """ + assert(uploadId,'start multipart upload with a .post_object_init') + # assemble xml + body = self._buildxmlPartsList(partsList) + # + objectname_plus = object_name+'?uploadId='+uploadId + query = self.query_factory( + action="POST", creds=self.creds, endpoint=self.endpoint, + bucket=bucket, object_name=objectname_plus, data=body, + content_type=content_type, metadata=metadata) + d = query.submit() + d.addCallback(self._parse_finish_response) + return d + + def _buildxmlPartsList(self,partsList): + xml = [] + partsList.sort(key=lambda p: p[0]) + xml.append('<CompleteMultipartUpload>') + for pt in partsList: + xml.append('<Part>') + xmlp = ''.join(['<PartNumber>',pt[0],'</PartNumber>']) + xml.append(xmlp) + xmlp = ''.join(['<ETag>',pt[1],'</ETag>']) + xml.append(xmlp) + xml.append('</Part>') + xml.append('</CompleteMultipartUpload>') + return '\n'.join(xml) + + def _parse_finish_response(self,xml_bytes): + """parse the response to post_object_init""" + root = XML(xml_bytes) + errorNode = root.find("Error") + uploadRes = root.findtext("Key") + if errorNode: + error = errorNode.findtext('Message') + raise txaws.s3.exception.S3Error('Error: %s'%error) + if uploadRes: + return 'multipart upload complete %s'%uploadRes + raise txaws.s3.exception.S3Error( + 'multipart upload finish did not return valid page: \n%s'%xml_bytes) + + def list_mpuploads(self, bucket, content_type=None, metadata={}): + """ + Gets a list of started but not finished multipart uploads in a bucket. + + returns a list + """ + path = '?uploads' + query = self.query_factory( + action="GET", creds=self.creds, endpoint=self.endpoint, + bucket=bucket, object_name=path, data='', + content_type=content_type, metadata=metadata) + d = query.submit() + return d.addCallback(self._parse_mpupload_list) + + def _parse_mpupload_list(self, xml_bytes): + """ + Parse XML multipart upload list response. + """ + root = XML(xml_bytes) + uploads = [] + for uploads_data in root.findall("Upload"): + uploadId = uploads_data.findtext("UploadId") + key = uploads_data.findtext("Key") + initdate = uploads_data.findtext("Initiated") + uploads.append((uploadId,initdate,key)) + return uploads + +>>>>>>> MERGE-SOURCE class Query(BaseQuery): """A query for submission to the S3 service.""" @@ -417,4 +575,47 @@ d = self.get_page( url_context.get_url(), method=self.action, postdata=self.data, headers=self.get_headers()) - return d.addErrback(s3_error_wrapper) + return d + #return d.addErrback(s3_error_wrapper) + +class StreamingQuery(Query): + def get_page(self, url, method="GET", postdata=None, headers=None, + agent=None, timeout=0, cookies=None, + followRedirect=True, redirectLimit=20, afterFoundGet=False): + scheme, host, port, path = parse(url) + + postdata = stream.MemoryStream(postdata) + + rawHeaders = {} + + for name, value in headers.items(): + rawHeaders[name] = [str(value)] + + headers = http_headers.Headers(rawHeaders=rawHeaders) + + request = web2_client_http.ClientRequest(method, url, headers, None) + + client = protocol.ClientCreator(reactor, web2_client_http.HTTPClientProtocol) + + if scheme == 'https': + contextFactory = ssl.ClientContextFactory() + deferred = client.connectSSL(host, port, contextFactory) + else: + deferred = client.connectTCP(host, port) + + def connected(proto): + d = proto.submitRequest(request) + d.addCallback(handleResponse) + + return d + + def handleResponse(response): + if not response.code in (200, 204): + error = web2_http.HTTPError(response) + raise error + + return response.stream + + deferred.addCallback(connected) + + return deferred === added file 'txaws/s3/multipart.py' --- txaws/s3/multipart.py 1970-01-01 00:00:00 +0000 +++ txaws/s3/multipart.py 2011-09-22 20:23:28 +0000 @@ -0,0 +1,81 @@ + + + +class MultipartManager(object): + """A client for S3.""" + + def __init__(self, client, bucket=None, object_name=None, content_type=None, + metadata={}): + self.client = client + self.object_name = object_name + self.bucket = bucket + self.content_type = content_type + self.metadata = metadata + self.uploadId = False + self.partTuples = [] + + def initialize(self): + """ + Starts a multipart upload to a bucket. + Any existing object of the same name will be replaced. + + returns a deferred + """ + def saveUploadId(uploadId): + self.uploadId = uploadId + return uploadId + d = self.client.post_object_init(bucket=self.bucket, + object_name=self.object_name, + content_type=self.content_type, + metadata=self.metadata) + return d.addCallback(saveUploadId) + + def partNumber(self): + return str(len(self.partTuples)+1) + + def send_part(self, data): + """ + Continues a multipart upload to a bucket. + Sends another slug of data + + returns deferred from query.submit + """ + assert(self.uploadId,'start multipart upload with a .post_object_init') + def process_headers(hdrs): + # save eTags and partNumbers for use by finish + etag = hdrs['etag'][0].strip('""') + partNum = self.partNumber() + tupl = (partNum,etag) + self.partTuples.append(tupl) + return hdrs + d = self.client.put_object_part(bucket=self.bucket, + object_name=self.object_name, + uploadId=self.uploadId, data=data, + partNumber=self.partNumber()) + d.addCallback(process_headers) + return d + + def abort(self): + """ + Aborts the multipart upload. + + Returns deferred + """ + d = self.client.abort_object(bucket=self.bucket, + object_name=self.object_name, + uploadId=self.uploadId) + return d + + def finish(self): + """ + Completes the multipart upload. + + Can be slow, returns deferred + """ + d = self.client.post_object_finish(bucket=self.bucket, + object_name=self.object_name, + uploadId=self.uploadId, + partsList=self.partTuples) + return d + + === modified file 'txaws/script.py' --- txaws/script.py 2009-11-22 21:53:54 +0000 +++ txaws/script.py 2011-09-22 20:23:28 +0000 @@ -32,6 +32,9 @@ parser.add_option( "-c", "--content-type", dest="content_type", help="content type of the object") + parser.add_option( + "-u", "--upload-id", dest="uploadid", + help="upload id of the object") options, args = parser.parse_args() if not (options.access_key and options.secret_key): parser.error(
_______________________________________________ Mailing list: https://launchpad.net/~txaws-dev Post to : txaws-dev@lists.launchpad.net Unsubscribe : https://launchpad.net/~txaws-dev More help : https://help.launchpad.net/ListHelp