http://git-wip-us.apache.org/repos/asf/libcloud/blob/8afcda91/apache-libcloud-1.0.0rc2/libcloud/storage/base.py ---------------------------------------------------------------------- diff --git a/apache-libcloud-1.0.0rc2/libcloud/storage/base.py b/apache-libcloud-1.0.0rc2/libcloud/storage/base.py deleted file mode 100644 index f13dd0a..0000000 --- a/apache-libcloud-1.0.0rc2/libcloud/storage/base.py +++ /dev/null @@ -1,831 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Provides base classes for working with storage -""" - -# Backward compatibility for Python 2.5 -from __future__ import with_statement - -import os.path # pylint: disable-msg=W0404 -import hashlib -from os.path import join as pjoin - -from libcloud.utils.py3 import httplib -from libcloud.utils.py3 import next -from libcloud.utils.py3 import b - -import libcloud.utils.files -from libcloud.common.types import LibcloudError -from libcloud.common.base import ConnectionUserAndKey, BaseDriver -from libcloud.storage.types import ObjectDoesNotExistError - -__all__ = [ - 'Object', - 'Container', - 'StorageDriver', - - 'CHUNK_SIZE', - 'DEFAULT_CONTENT_TYPE' -] - -CHUNK_SIZE = 8096 - -# Default Content-Type which is sent when uploading an object if one is not -# supplied and can't be detected when using non-strict mode. -DEFAULT_CONTENT_TYPE = 'application/octet-stream' - - -class Object(object): - """ - Represents an object (BLOB). - """ - - def __init__(self, name, size, hash, extra, meta_data, container, - driver): - """ - :param name: Object name (must be unique per container). - :type name: ``str`` - - :param size: Object size in bytes. - :type size: ``int`` - - :param hash: Object hash. - :type hash: ``str`` - - :param container: Object container. - :type container: :class:`Container` - - :param extra: Extra attributes. - :type extra: ``dict`` - - :param meta_data: Optional object meta data. - :type meta_data: ``dict`` - - :param driver: StorageDriver instance. - :type driver: :class:`StorageDriver` - """ - - self.name = name - self.size = size - self.hash = hash - self.container = container - self.extra = extra or {} - self.meta_data = meta_data or {} - self.driver = driver - - def get_cdn_url(self): - return self.driver.get_object_cdn_url(obj=self) - - def enable_cdn(self, **kwargs): - return self.driver.enable_object_cdn(obj=self, **kwargs) - - def download(self, destination_path, overwrite_existing=False, - delete_on_failure=True): - return self.driver.download_object(self, destination_path, - overwrite_existing, - delete_on_failure) - - def as_stream(self, chunk_size=None): - return self.driver.download_object_as_stream(self, chunk_size) - - def delete(self): - return self.driver.delete_object(self) - - def __repr__(self): - return ('<Object: name=%s, size=%s, hash=%s, provider=%s ...>' % - (self.name, self.size, self.hash, self.driver.name)) - - -class Container(object): - """ - Represents a container (bucket) which can hold multiple objects. - """ - - def __init__(self, name, extra, driver): - """ - :param name: Container name (must be unique). - :type name: ``str`` - - :param extra: Extra attributes. - :type extra: ``dict`` - - :param driver: StorageDriver instance. - :type driver: :class:`StorageDriver` - """ - - self.name = name - self.extra = extra or {} - self.driver = driver - - def iterate_objects(self): - return self.driver.iterate_container_objects(container=self) - - def list_objects(self): - return self.driver.list_container_objects(container=self) - - def get_cdn_url(self): - return self.driver.get_container_cdn_url(container=self) - - def enable_cdn(self, **kwargs): - return self.driver.enable_container_cdn(container=self, **kwargs) - - def get_object(self, object_name): - return self.driver.get_object(container_name=self.name, - object_name=object_name) - - def upload_object(self, file_path, object_name, extra=None, **kwargs): - return self.driver.upload_object( - file_path, self, object_name, extra=extra, **kwargs) - - def upload_object_via_stream(self, iterator, object_name, extra=None, - **kwargs): - return self.driver.upload_object_via_stream( - iterator, self, object_name, extra=extra, **kwargs) - - def download_object(self, obj, destination_path, overwrite_existing=False, - delete_on_failure=True): - return self.driver.download_object( - obj, destination_path, overwrite_existing=overwrite_existing, - delete_on_failure=delete_on_failure) - - def download_object_as_stream(self, obj, chunk_size=None): - return self.driver.download_object_as_stream(obj, chunk_size) - - def delete_object(self, obj): - return self.driver.delete_object(obj) - - def delete(self): - return self.driver.delete_container(self) - - def __repr__(self): - return ('<Container: name=%s, provider=%s>' - % (self.name, self.driver.name)) - - -class StorageDriver(BaseDriver): - """ - A base StorageDriver to derive from. - """ - - connectionCls = ConnectionUserAndKey - name = None - hash_type = 'md5' - supports_chunked_encoding = False - - # When strict mode is used, exception will be thrown if no content type is - # provided and none can be detected when uploading an object - strict_mode = False - - def iterate_containers(self): - """ - Return a generator of containers for the given account - - :return: A generator of Container instances. - :rtype: ``generator`` of :class:`Container` - """ - raise NotImplementedError( - 'iterate_containers not implemented for this driver') - - def list_containers(self): - """ - Return a list of containers. - - :return: A list of Container instances. - :rtype: ``list`` of :class:`Container` - """ - return list(self.iterate_containers()) - - def iterate_container_objects(self, container): - """ - Return a generator of objects for the given container. - - :param container: Container instance - :type container: :class:`Container` - - :return: A generator of Object instances. - :rtype: ``generator`` of :class:`Object` - """ - raise NotImplementedError( - 'iterate_container_objects not implemented for this driver') - - def list_container_objects(self, container): - """ - Return a list of objects for the given container. - - :param container: Container instance. - :type container: :class:`Container` - - :return: A list of Object instances. - :rtype: ``list`` of :class:`Object` - """ - return list(self.iterate_container_objects(container)) - - def get_container(self, container_name): - """ - Return a container instance. - - :param container_name: Container name. - :type container_name: ``str`` - - :return: :class:`Container` instance. - :rtype: :class:`Container` - """ - raise NotImplementedError( - 'get_object not implemented for this driver') - - def get_container_cdn_url(self, container): - """ - Return a container CDN URL. - - :param container: Container instance - :type container: :class:`Container` - - :return: A CDN URL for this container. - :rtype: ``str`` - """ - raise NotImplementedError( - 'get_container_cdn_url not implemented for this driver') - - def get_object(self, container_name, object_name): - """ - Return an object instance. - - :param container_name: Container name. - :type container_name: ``str`` - - :param object_name: Object name. - :type object_name: ``str`` - - :return: :class:`Object` instance. - :rtype: :class:`Object` - """ - raise NotImplementedError( - 'get_object not implemented for this driver') - - def get_object_cdn_url(self, obj): - """ - Return an object CDN URL. - - :param obj: Object instance - :type obj: :class:`Object` - - :return: A CDN URL for this object. - :rtype: ``str`` - """ - raise NotImplementedError( - 'get_object_cdn_url not implemented for this driver') - - def enable_container_cdn(self, container): - """ - Enable container CDN. - - :param container: Container instance - :type container: :class:`Container` - - :rtype: ``bool`` - """ - raise NotImplementedError( - 'enable_container_cdn not implemented for this driver') - - def enable_object_cdn(self, obj): - """ - Enable object CDN. - - :param obj: Object instance - :type obj: :class:`Object` - - :rtype: ``bool`` - """ - raise NotImplementedError( - 'enable_object_cdn not implemented for this driver') - - def download_object(self, obj, destination_path, overwrite_existing=False, - delete_on_failure=True): - """ - Download an object to the specified destination path. - - :param obj: Object instance. - :type obj: :class:`Object` - - :param destination_path: Full path to a file or a directory where the - incoming file will be saved. - :type destination_path: ``str`` - - :param overwrite_existing: True to overwrite an existing file, - defaults to False. - :type overwrite_existing: ``bool`` - - :param delete_on_failure: True to delete a partially downloaded file if - the download was not successful (hash - mismatch / file size). - :type delete_on_failure: ``bool`` - - :return: True if an object has been successfully downloaded, False - otherwise. - :rtype: ``bool`` - """ - raise NotImplementedError( - 'download_object not implemented for this driver') - - def download_object_as_stream(self, obj, chunk_size=None): - """ - Return a generator which yields object data. - - :param obj: Object instance - :type obj: :class:`Object` - - :param chunk_size: Optional chunk size (in bytes). - :type chunk_size: ``int`` - """ - raise NotImplementedError( - 'download_object_as_stream not implemented for this driver') - - def upload_object(self, file_path, container, object_name, extra=None, - verify_hash=True, headers=None): - """ - Upload an object currently located on a disk. - - :param file_path: Path to the object on disk. - :type file_path: ``str`` - - :param container: Destination container. - :type container: :class:`Container` - - :param object_name: Object name. - :type object_name: ``str`` - - :param verify_hash: Verify hash - :type verify_hash: ``bool`` - - :param extra: Extra attributes (driver specific). (optional) - :type extra: ``dict`` - - :param headers: (optional) Additional request headers, - such as CORS headers. For example: - headers = {'Access-Control-Allow-Origin': 'http://mozilla.com'} - :type headers: ``dict`` - - :rtype: :class:`Object` - """ - raise NotImplementedError( - 'upload_object not implemented for this driver') - - def upload_object_via_stream(self, iterator, container, - object_name, - extra=None, - headers=None): - """ - Upload an object using an iterator. - - If a provider supports it, chunked transfer encoding is used and you - don't need to know in advance the amount of data to be uploaded. - - Otherwise if a provider doesn't support it, iterator will be exhausted - so a total size for data to be uploaded can be determined. - - Note: Exhausting the iterator means that the whole data must be - buffered in memory which might result in memory exhausting when - uploading a very large object. - - If a file is located on a disk you are advised to use upload_object - function which uses fs.stat function to determine the file size and it - doesn't need to buffer whole object in the memory. - - :param iterator: An object which implements the iterator interface. - :type iterator: :class:`object` - - :param container: Destination container. - :type container: :class:`Container` - - :param object_name: Object name. - :type object_name: ``str`` - - :param extra: (optional) Extra attributes (driver specific). Note: - This dictionary must contain a 'content_type' key which represents - a content type of the stored object. - :type extra: ``dict`` - - :param headers: (optional) Additional request headers, - such as CORS headers. For example: - headers = {'Access-Control-Allow-Origin': 'http://mozilla.com'} - :type headers: ``dict`` - - :rtype: ``object`` - """ - raise NotImplementedError( - 'upload_object_via_stream not implemented for this driver') - - def delete_object(self, obj): - """ - Delete an object. - - :param obj: Object instance. - :type obj: :class:`Object` - - :return: ``bool`` True on success. - :rtype: ``bool`` - """ - raise NotImplementedError( - 'delete_object not implemented for this driver') - - def create_container(self, container_name): - """ - Create a new container. - - :param container_name: Container name. - :type container_name: ``str`` - - :return: Container instance on success. - :rtype: :class:`Container` - """ - raise NotImplementedError( - 'create_container not implemented for this driver') - - def delete_container(self, container): - """ - Delete a container. - - :param container: Container instance - :type container: :class:`Container` - - :return: ``True`` on success, ``False`` otherwise. - :rtype: ``bool`` - """ - raise NotImplementedError( - 'delete_container not implemented for this driver') - - def _get_object(self, obj, callback, callback_kwargs, response, - success_status_code=None): - """ - Call passed callback and start transfer of the object' - - :param obj: Object instance. - :type obj: :class:`Object` - - :param callback: Function which is called with the passed - callback_kwargs - :type callback: :class:`function` - - :param callback_kwargs: Keyword arguments which are passed to the - callback. - :type callback_kwargs: ``dict`` - - :param response: Response instance. - :type response: :class:`Response` - - :param success_status_code: Status code which represents a successful - transfer (defaults to httplib.OK) - :type success_status_code: ``int`` - - :return: ``True`` on success, ``False`` otherwise. - :rtype: ``bool`` - """ - success_status_code = success_status_code or httplib.OK - - if response.status == success_status_code: - return callback(**callback_kwargs) - elif response.status == httplib.NOT_FOUND: - raise ObjectDoesNotExistError(object_name=obj.name, - value='', driver=self) - - raise LibcloudError(value='Unexpected status code: %s' % - (response.status), - driver=self) - - def _save_object(self, response, obj, destination_path, - overwrite_existing=False, delete_on_failure=True, - chunk_size=None): - """ - Save object to the provided path. - - :param response: RawResponse instance. - :type response: :class:`RawResponse` - - :param obj: Object instance. - :type obj: :class:`Object` - - :param destination_path: Destination directory. - :type destination_path: ``str`` - - :param delete_on_failure: True to delete partially downloaded object if - the download fails. - :type delete_on_failure: ``bool`` - - :param overwrite_existing: True to overwrite a local path if it already - exists. - :type overwrite_existing: ``bool`` - - :param chunk_size: Optional chunk size - (defaults to ``libcloud.storage.base.CHUNK_SIZE``, 8kb) - :type chunk_size: ``int`` - - :return: ``True`` on success, ``False`` otherwise. - :rtype: ``bool`` - """ - - chunk_size = chunk_size or CHUNK_SIZE - - base_name = os.path.basename(destination_path) - - if not base_name and not os.path.exists(destination_path): - raise LibcloudError( - value='Path %s does not exist' % (destination_path), - driver=self) - - if not base_name: - file_path = pjoin(destination_path, obj.name) - else: - file_path = destination_path - - if os.path.exists(file_path) and not overwrite_existing: - raise LibcloudError( - value='File %s already exists, but ' % (file_path) + - 'overwrite_existing=False', - driver=self) - - stream = libcloud.utils.files.read_in_chunks(response, chunk_size) - - try: - data_read = next(stream) - except StopIteration: - # Empty response? - return False - - bytes_transferred = 0 - - with open(file_path, 'wb') as file_handle: - while len(data_read) > 0: - file_handle.write(b(data_read)) - bytes_transferred += len(data_read) - - try: - data_read = next(stream) - except StopIteration: - data_read = '' - - if int(obj.size) != int(bytes_transferred): - # Transfer failed, support retry? - if delete_on_failure: - try: - os.unlink(file_path) - except Exception: - pass - - return False - - return True - - def _upload_object(self, object_name, content_type, upload_func, - upload_func_kwargs, request_path, request_method='PUT', - headers=None, file_path=None, iterator=None): - """ - Helper function for setting common request headers and calling the - passed in callback which uploads an object. - """ - headers = headers or {} - - if file_path and not os.path.exists(file_path): - raise OSError('File %s does not exist' % (file_path)) - - if iterator is not None and not hasattr(iterator, 'next') and not \ - hasattr(iterator, '__next__'): - raise AttributeError('iterator object must implement next() ' + - 'method.') - - if not content_type: - if file_path: - name = file_path - else: - name = object_name - content_type, _ = libcloud.utils.files.guess_file_mime_type(name) - - if not content_type: - if self.strict_mode: - raise AttributeError('File content-type could not be ' - 'guessed and no content_type value ' - 'is provided') - else: - # Fallback to a content-type - content_type = DEFAULT_CONTENT_TYPE - - file_size = None - - if iterator: - if self.supports_chunked_encoding: - headers['Transfer-Encoding'] = 'chunked' - upload_func_kwargs['chunked'] = True - else: - # Chunked transfer encoding is not supported. Need to buffer - # all the data in memory so we can determine file size. - iterator = libcloud.utils.files.read_in_chunks( - iterator=iterator) - data = libcloud.utils.files.exhaust_iterator(iterator=iterator) - - file_size = len(data) - upload_func_kwargs['data'] = data - else: - file_size = os.path.getsize(file_path) - upload_func_kwargs['chunked'] = False - - if file_size is not None and 'Content-Length' not in headers: - headers['Content-Length'] = file_size - - headers['Content-Type'] = content_type - response = self.connection.request(request_path, - method=request_method, data=None, - headers=headers, raw=True) - - upload_func_kwargs['response'] = response - success, data_hash, bytes_transferred = upload_func( - **upload_func_kwargs) - - if not success: - raise LibcloudError( - value='Object upload failed, Perhaps a timeout?', driver=self) - - result_dict = {'response': response, 'data_hash': data_hash, - 'bytes_transferred': bytes_transferred} - return result_dict - - def _upload_data(self, response, data, calculate_hash=True): - """ - Upload data stored in a string. - - :param response: RawResponse object. - :type response: :class:`RawResponse` - - :param data: Data to upload. - :type data: ``str`` - - :param calculate_hash: True to calculate hash of the transferred data. - (defaults to True). - :type calculate_hash: ``bool`` - - :return: First item is a boolean indicator of success, second - one is the uploaded data MD5 hash and the third one - is the number of transferred bytes. - :rtype: ``tuple`` - """ - bytes_transferred = 0 - data_hash = None - - if calculate_hash: - data_hash = self._get_hash_function() - data_hash.update(b(data)) - - try: - response.connection.connection.send(b(data)) - except Exception: - # TODO: let this exception propagate - # Timeout, etc. - return False, None, bytes_transferred - - bytes_transferred = len(data) - - if calculate_hash: - data_hash = data_hash.hexdigest() - - return True, data_hash, bytes_transferred - - def _stream_data(self, response, iterator, chunked=False, - calculate_hash=True, chunk_size=None, data=None): - """ - Stream a data over an http connection. - - :param response: RawResponse object. - :type response: :class:`RawResponse` - - :param response: An object which implements an iterator interface - or a File like object with read method. - :type iterator: :class:`object` - - :param chunked: True if the chunked transfer encoding should be used - (defaults to False). - :type chunked: ``bool`` - - :param calculate_hash: True to calculate hash of the transferred data. - (defaults to True). - :type calculate_hash: ``bool`` - - :param chunk_size: Optional chunk size (defaults to ``CHUNK_SIZE``) - :type chunk_size: ``int`` - - :rtype: ``tuple`` - :return: First item is a boolean indicator of success, second - one is the uploaded data MD5 hash and the third one - is the number of transferred bytes. - """ - - chunk_size = chunk_size or CHUNK_SIZE - - data_hash = None - if calculate_hash: - data_hash = self._get_hash_function() - - generator = libcloud.utils.files.read_in_chunks(iterator, chunk_size, - fill_size=True) - - bytes_transferred = 0 - try: - chunk = next(generator) - except StopIteration: - # Special case when StopIteration is thrown on the first iteration - # create a 0-byte long object - chunk = '' - if chunked: - response.connection.connection.send(b('%X\r\n' % - (len(chunk)))) - response.connection.connection.send(chunk) - response.connection.connection.send(b('\r\n')) - response.connection.connection.send(b('0\r\n\r\n')) - else: - response.connection.connection.send(chunk) - return True, data_hash.hexdigest(), bytes_transferred - - while len(chunk) > 0: - try: - if chunked: - response.connection.connection.send(b('%X\r\n' % - (len(chunk)))) - response.connection.connection.send(b(chunk)) - response.connection.connection.send(b('\r\n')) - else: - response.connection.connection.send(b(chunk)) - except Exception: - # TODO: let this exception propagate - # Timeout, etc. - return False, None, bytes_transferred - - bytes_transferred += len(chunk) - if calculate_hash: - data_hash.update(b(chunk)) - - try: - chunk = next(generator) - except StopIteration: - chunk = '' - - if chunked: - response.connection.connection.send(b('0\r\n\r\n')) - - if calculate_hash: - data_hash = data_hash.hexdigest() - - return True, data_hash, bytes_transferred - - def _upload_file(self, response, file_path, chunked=False, - calculate_hash=True): - """ - Upload a file to the server. - - :type response: :class:`RawResponse` - :param response: RawResponse object. - - :type file_path: ``str`` - :param file_path: Path to a local file. - - :type iterator: :class:`object` - :param response: An object which implements an iterator interface (File - object, etc.) - - :rtype: ``tuple`` - :return: First item is a boolean indicator of success, second - one is the uploaded data MD5 hash and the third one - is the number of transferred bytes. - """ - with open(file_path, 'rb') as file_handle: - success, data_hash, bytes_transferred = ( - self._stream_data( - response=response, - iterator=iter(file_handle), - chunked=chunked, - calculate_hash=calculate_hash)) - - return success, data_hash, bytes_transferred - - def _get_hash_function(self): - """ - Return instantiated hash function for the hash type supported by - the provider. - """ - try: - func = getattr(hashlib, self.hash_type)() - except AttributeError: - raise RuntimeError('Invalid or unsupported hash type: %s' % - (self.hash_type)) - - return func
http://git-wip-us.apache.org/repos/asf/libcloud/blob/8afcda91/apache-libcloud-1.0.0rc2/libcloud/storage/drivers/__init__.py ---------------------------------------------------------------------- diff --git a/apache-libcloud-1.0.0rc2/libcloud/storage/drivers/__init__.py b/apache-libcloud-1.0.0rc2/libcloud/storage/drivers/__init__.py deleted file mode 100644 index fe8b04f..0000000 --- a/apache-libcloud-1.0.0rc2/libcloud/storage/drivers/__init__.py +++ /dev/null @@ -1,23 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Drivers for working with different providers -""" - -__all__ = [ - 'dummy', - 'cloudfiles' -] http://git-wip-us.apache.org/repos/asf/libcloud/blob/8afcda91/apache-libcloud-1.0.0rc2/libcloud/storage/drivers/atmos.py ---------------------------------------------------------------------- diff --git a/apache-libcloud-1.0.0rc2/libcloud/storage/drivers/atmos.py b/apache-libcloud-1.0.0rc2/libcloud/storage/drivers/atmos.py deleted file mode 100644 index c52be03..0000000 --- a/apache-libcloud-1.0.0rc2/libcloud/storage/drivers/atmos.py +++ /dev/null @@ -1,472 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import sys -import base64 -import hashlib -import hmac -import time - -from libcloud.utils.py3 import PY3 -from libcloud.utils.py3 import b -from libcloud.utils.py3 import httplib -from libcloud.utils.py3 import next -from libcloud.utils.py3 import urlparse -from libcloud.utils.py3 import urlencode -from libcloud.utils.py3 import urlquote -from libcloud.utils.py3 import urlunquote - -if PY3: - from io import FileIO as file - -from libcloud.utils.files import read_in_chunks, guess_file_mime_type -from libcloud.common.base import ConnectionUserAndKey, XmlResponse -from libcloud.common.types import LibcloudError - -from libcloud.storage.base import Object, Container, StorageDriver, CHUNK_SIZE -from libcloud.storage.types import ContainerAlreadyExistsError, \ - ContainerDoesNotExistError, ContainerIsNotEmptyError, \ - ObjectDoesNotExistError - - -def collapse(s): - return ' '.join([x for x in s.split(' ') if x]) - - -class AtmosError(LibcloudError): - def __init__(self, code, message, driver=None): - super(AtmosError, self).__init__(value=message, driver=driver) - self.code = code - - -class AtmosResponse(XmlResponse): - def success(self): - return self.status in (httplib.OK, httplib.CREATED, httplib.NO_CONTENT, - httplib.PARTIAL_CONTENT) - - def parse_error(self): - tree = self.parse_body() - - if tree is None: - return None - - code = int(tree.find('Code').text) - message = tree.find('Message').text - raise AtmosError(code=code, message=message, - driver=self.connection.driver) - - -class AtmosConnection(ConnectionUserAndKey): - responseCls = AtmosResponse - - def add_default_headers(self, headers): - headers['x-emc-uid'] = self.user_id - headers['Date'] = time.strftime('%a, %d %b %Y %H:%M:%S GMT', - time.gmtime()) - headers['x-emc-date'] = headers['Date'] - - if 'Content-Type' not in headers: - headers['Content-Type'] = 'application/octet-stream' - if 'Accept' not in headers: - headers['Accept'] = '*/*' - - return headers - - def pre_connect_hook(self, params, headers): - headers['x-emc-signature'] = self._calculate_signature(params, headers) - - return params, headers - - def _calculate_signature(self, params, headers): - pathstring = urlunquote(self.action) - if pathstring.startswith(self.driver.path): - pathstring = pathstring[len(self.driver.path):] - if params: - if type(params) is dict: - params = list(params.items()) - pathstring += '?' + urlencode(params) - pathstring = pathstring.lower() - - xhdrs = [(k, v) for k, v in list(headers.items()) if - k.startswith('x-emc-')] - xhdrs.sort(key=lambda x: x[0]) - - signature = [ - self.method, - headers.get('Content-Type', ''), - headers.get('Range', ''), - headers.get('Date', ''), - pathstring, - ] - signature.extend([k + ':' + collapse(v) for k, v in xhdrs]) - signature = '\n'.join(signature) - key = base64.b64decode(self.key) - signature = hmac.new(b(key), b(signature), hashlib.sha1).digest() - return base64.b64encode(b(signature)).decode('utf-8') - - -class AtmosDriver(StorageDriver): - connectionCls = AtmosConnection - - host = None - path = None - api_name = 'atmos' - supports_chunked_encoding = True - website = 'http://atmosonline.com/' - name = 'atmos' - - DEFAULT_CDN_TTL = 60 * 60 * 24 * 7 # 1 week - - def __init__(self, key, secret=None, secure=True, host=None, port=None): - host = host or self.host - super(AtmosDriver, self).__init__(key, secret, secure, host, port) - - def iterate_containers(self): - result = self.connection.request(self._namespace_path('')) - entries = self._list_objects(result.object, object_type='directory') - for entry in entries: - extra = { - 'object_id': entry['id'] - } - yield Container(entry['name'], extra, self) - - def get_container(self, container_name): - path = self._namespace_path(container_name) + '/?metadata/system' - try: - result = self.connection.request(path) - except AtmosError: - e = sys.exc_info()[1] - if e.code != 1003: - raise - raise ContainerDoesNotExistError(e, self, container_name) - meta = self._emc_meta(result) - extra = { - 'object_id': meta['objectid'] - } - return Container(container_name, extra, self) - - def create_container(self, container_name): - path = self._namespace_path(container_name) + '/' - try: - self.connection.request(path, method='POST') - except AtmosError: - e = sys.exc_info()[1] - if e.code != 1016: - raise - raise ContainerAlreadyExistsError(e, self, container_name) - return self.get_container(container_name) - - def delete_container(self, container): - try: - self.connection.request(self._namespace_path(container.name) + '/', - method='DELETE') - except AtmosError: - e = sys.exc_info()[1] - if e.code == 1003: - raise ContainerDoesNotExistError(e, self, container.name) - elif e.code == 1023: - raise ContainerIsNotEmptyError(e, self, container.name) - return True - - def get_object(self, container_name, object_name): - container = self.get_container(container_name) - object_name_cleaned = self._clean_object_name(object_name) - path = self._namespace_path(container_name) + '/' + object_name_cleaned - - try: - result = self.connection.request(path + '?metadata/system') - system_meta = self._emc_meta(result) - - result = self.connection.request(path + '?metadata/user') - user_meta = self._emc_meta(result) - except AtmosError: - e = sys.exc_info()[1] - if e.code != 1003: - raise - raise ObjectDoesNotExistError(e, self, object_name) - - last_modified = time.strptime(system_meta['mtime'], - '%Y-%m-%dT%H:%M:%SZ') - last_modified = time.strftime('%a, %d %b %Y %H:%M:%S GMT', - last_modified) - extra = { - 'object_id': system_meta['objectid'], - 'last_modified': last_modified - } - data_hash = user_meta.pop('md5', '') - return Object(object_name, int(system_meta['size']), data_hash, extra, - user_meta, container, self) - - def upload_object(self, file_path, container, object_name, extra=None, - verify_hash=True): - upload_func = self._upload_file - upload_func_kwargs = {'file_path': file_path} - method = 'PUT' - - extra = extra or {} - object_name_cleaned = self._clean_object_name(object_name) - request_path = self._namespace_path(container.name) + '/' +\ - object_name_cleaned - content_type = extra.get('content_type', None) - - try: - self.connection.request(request_path + '?metadata/system') - except AtmosError: - e = sys.exc_info()[1] - if e.code != 1003: - raise - method = 'POST' - - result_dict = self._upload_object( - object_name=object_name, - content_type=content_type, - upload_func=upload_func, - upload_func_kwargs=upload_func_kwargs, - request_path=request_path, - request_method=method, - headers={}, file_path=file_path) - - bytes_transferred = result_dict['bytes_transferred'] - - if extra is None: - meta_data = {} - else: - meta_data = extra.get('meta_data', {}) - meta_data['md5'] = result_dict['data_hash'] - user_meta = ', '.join([k + '=' + str(v) for k, v in - list(meta_data.items())]) - self.connection.request(request_path + '?metadata/user', method='POST', - headers={'x-emc-meta': user_meta}) - result = self.connection.request(request_path + '?metadata/system') - meta = self._emc_meta(result) - del meta_data['md5'] - extra = { - 'object_id': meta['objectid'], - 'meta_data': meta_data, - } - - return Object(object_name, bytes_transferred, result_dict['data_hash'], - extra, meta_data, container, self) - - def upload_object_via_stream(self, iterator, container, object_name, - extra=None): - if isinstance(iterator, file): - iterator = iter(iterator) - - data_hash = hashlib.md5() - generator = read_in_chunks(iterator, CHUNK_SIZE, True) - bytes_transferred = 0 - try: - chunk = next(generator) - except StopIteration: - chunk = '' - - path = self._namespace_path(container.name + '/' + object_name) - method = 'PUT' - - if extra is not None: - content_type = extra.get('content_type', None) - else: - content_type = None - if not content_type: - content_type, _ = guess_file_mime_type(object_name) - - if not content_type: - raise AttributeError( - 'File content-type could not be guessed and' + - ' no content_type value provided') - - try: - self.connection.request(path + '?metadata/system') - except AtmosError: - e = sys.exc_info()[1] - if e.code != 1003: - raise - method = 'POST' - - while True: - end = bytes_transferred + len(chunk) - 1 - data_hash.update(b(chunk)) - headers = { - 'x-emc-meta': 'md5=' + data_hash.hexdigest(), - 'Content-Type': content_type, - } - - if len(chunk) > 0 and bytes_transferred > 0: - headers['Range'] = 'Bytes=%d-%d' % (bytes_transferred, end) - method = 'PUT' - - result = self.connection.request(path, method=method, data=chunk, - headers=headers) - bytes_transferred += len(chunk) - - try: - chunk = next(generator) - except StopIteration: - break - if len(chunk) == 0: - break - - data_hash = data_hash.hexdigest() - - if extra is None: - meta_data = {} - else: - meta_data = extra.get('meta_data', {}) - meta_data['md5'] = data_hash - user_meta = ', '.join([k + '=' + str(v) for k, v in - list(meta_data.items())]) - self.connection.request(path + '?metadata/user', method='POST', - headers={'x-emc-meta': user_meta}) - - result = self.connection.request(path + '?metadata/system') - - meta = self._emc_meta(result) - extra = { - 'object_id': meta['objectid'], - 'meta_data': meta_data, - } - - return Object(object_name, bytes_transferred, data_hash, extra, - meta_data, container, self) - - def download_object(self, obj, destination_path, overwrite_existing=False, - delete_on_failure=True): - path = self._namespace_path(obj.container.name + '/' + obj.name) - response = self.connection.request(path, method='GET', raw=True) - - return self._get_object(obj=obj, callback=self._save_object, - response=response, - callback_kwargs={ - 'obj': obj, - 'response': response.response, - 'destination_path': destination_path, - 'overwrite_existing': overwrite_existing, - 'delete_on_failure': delete_on_failure - }, - success_status_code=httplib.OK) - - def download_object_as_stream(self, obj, chunk_size=None): - path = self._namespace_path(obj.container.name + '/' + obj.name) - response = self.connection.request(path, method='GET', raw=True) - - return self._get_object(obj=obj, callback=read_in_chunks, - response=response, - callback_kwargs={ - 'iterator': response.response, - 'chunk_size': chunk_size - }, - success_status_code=httplib.OK) - - def delete_object(self, obj): - path = self._namespace_path(obj.container.name) + '/' +\ - self._clean_object_name(obj.name) - try: - self.connection.request(path, method='DELETE') - except AtmosError: - e = sys.exc_info()[1] - if e.code != 1003: - raise - raise ObjectDoesNotExistError(e, self, obj.name) - return True - - def enable_object_cdn(self, obj): - return True - - def get_object_cdn_url(self, obj, expiry=None, use_object=False): - """ - Return an object CDN URL. - - :param obj: Object instance - :type obj: :class:`Object` - - :param expiry: Expiry - :type expiry: ``str`` - - :param use_object: Use object - :type use_object: ``bool`` - - :rtype: ``str`` - """ - if use_object: - path = '/rest/objects' + obj.meta_data['object_id'] - else: - path = '/rest/namespace/' + obj.container.name + '/' + obj.name - - if self.secure: - protocol = 'https' - else: - protocol = 'http' - - expiry = str(expiry or int(time.time()) + self.DEFAULT_CDN_TTL) - params = [ - ('uid', self.key), - ('expires', expiry), - ] - params.append(('signature', self._cdn_signature(path, params, expiry))) - - params = urlencode(params) - path = self.path + path - return urlparse.urlunparse((protocol, self.host, path, '', params, '')) - - def _cdn_signature(self, path, params, expiry): - key = base64.b64decode(self.secret) - signature = '\n'.join(['GET', path.lower(), self.key, expiry]) - signature = hmac.new(key, signature, hashlib.sha1).digest() - - return base64.b64encode(signature) - - def _list_objects(self, tree, object_type=None): - listing = tree.find(self._emc_tag('DirectoryList')) - entries = [] - for entry in listing.findall(self._emc_tag('DirectoryEntry')): - file_type = entry.find(self._emc_tag('FileType')).text - if object_type is not None and object_type != file_type: - continue - entries.append({ - 'id': entry.find(self._emc_tag('ObjectID')).text, - 'type': file_type, - 'name': entry.find(self._emc_tag('Filename')).text - }) - return entries - - def _clean_object_name(self, name): - return urlquote(name.encode('ascii')) - - def _namespace_path(self, path): - return self.path + '/rest/namespace/' + urlquote(path.encode('ascii')) - - def _object_path(self, object_id): - return self.path + '/rest/objects/' + object_id.encode('ascii') - - @staticmethod - def _emc_tag(tag): - return '{http://www.emc.com/cos/}' + tag - - def _emc_meta(self, response): - meta = response.headers.get('x-emc-meta', '') - if len(meta) == 0: - return {} - meta = meta.split(', ') - return dict([x.split('=', 1) for x in meta]) - - def iterate_container_objects(self, container): - headers = {'x-emc-include-meta': '1'} - path = self._namespace_path(container.name) + '/' - result = self.connection.request(path, headers=headers) - entries = self._list_objects(result.object, object_type='regular') - for entry in entries: - metadata = {'object_id': entry['id']} - yield Object(entry['name'], 0, '', {}, metadata, container, self) http://git-wip-us.apache.org/repos/asf/libcloud/blob/8afcda91/apache-libcloud-1.0.0rc2/libcloud/storage/drivers/auroraobjects.py ---------------------------------------------------------------------- diff --git a/apache-libcloud-1.0.0rc2/libcloud/storage/drivers/auroraobjects.py b/apache-libcloud-1.0.0rc2/libcloud/storage/drivers/auroraobjects.py deleted file mode 100644 index 96e7313..0000000 --- a/apache-libcloud-1.0.0rc2/libcloud/storage/drivers/auroraobjects.py +++ /dev/null @@ -1,52 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from libcloud.common.types import LibcloudError -from libcloud.storage.providers import Provider -from libcloud.storage.drivers.s3 import BaseS3StorageDriver, BaseS3Connection - -__all__ = [ - 'AuroraObjectsStorageDriver' -] - -AURORA_OBJECTS_EU_HOST = 'o.auroraobjects.eu' - -NO_CDN_SUPPORT_ERROR = 'CDN is not supported by AuroraObjects' - - -class BaseAuroraObjectsConnection(BaseS3Connection): - host = AURORA_OBJECTS_EU_HOST - - -class BaseAuroraObjectsStorageDriver(BaseS3StorageDriver): - type = Provider.AURORAOBJECTS - name = 'PCextreme AuroraObjects' - website = 'https://www.pcextreme.com/aurora/objects' - - -class AuroraObjectsStorageDriver(BaseAuroraObjectsStorageDriver): - connectionCls = BaseAuroraObjectsConnection - - def enable_container_cdn(self, *argv): - raise LibcloudError(NO_CDN_SUPPORT_ERROR, driver=self) - - def enable_object_cdn(self, *argv): - raise LibcloudError(NO_CDN_SUPPORT_ERROR, driver=self) - - def get_container_cdn_url(self, *argv): - raise LibcloudError(NO_CDN_SUPPORT_ERROR, driver=self) - - def get_object_cdn_url(self, *argv): - raise LibcloudError(NO_CDN_SUPPORT_ERROR, driver=self) http://git-wip-us.apache.org/repos/asf/libcloud/blob/8afcda91/apache-libcloud-1.0.0rc2/libcloud/storage/drivers/azure_blobs.py ---------------------------------------------------------------------- diff --git a/apache-libcloud-1.0.0rc2/libcloud/storage/drivers/azure_blobs.py b/apache-libcloud-1.0.0rc2/libcloud/storage/drivers/azure_blobs.py deleted file mode 100644 index 13d42f6..0000000 --- a/apache-libcloud-1.0.0rc2/libcloud/storage/drivers/azure_blobs.py +++ /dev/null @@ -1,986 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import with_statement - -import base64 -import os -import binascii - -from xml.etree.ElementTree import Element, SubElement - -from libcloud.utils.py3 import PY3 -from libcloud.utils.py3 import httplib -from libcloud.utils.py3 import urlquote -from libcloud.utils.py3 import tostring -from libcloud.utils.py3 import b - -from libcloud.utils.xml import fixxpath -from libcloud.utils.files import read_in_chunks -from libcloud.common.types import LibcloudError -from libcloud.common.azure import AzureConnection - -from libcloud.storage.base import Object, Container, StorageDriver -from libcloud.storage.types import ContainerIsNotEmptyError -from libcloud.storage.types import ContainerAlreadyExistsError -from libcloud.storage.types import InvalidContainerNameError -from libcloud.storage.types import ContainerDoesNotExistError -from libcloud.storage.types import ObjectDoesNotExistError -from libcloud.storage.types import ObjectHashMismatchError - -if PY3: - from io import FileIO as file - -# Desired number of items in each response inside a paginated request -RESPONSES_PER_REQUEST = 100 - -# As per the Azure documentation, if the upload file size is less than -# 64MB, we can upload it in a single request. However, in real life azure -# servers seem to disconnect randomly after around 5 MB or 200s of upload. -# So, it is better that for file sizes greater than 4MB, we upload it in -# chunks. -# Also, with large sizes, if we use a lease, the lease will timeout after -# 60 seconds, but the upload might still be in progress. This can be -# handled in code, but if we use chunked uploads, the lease renewal will -# happen automatically. -AZURE_BLOCK_MAX_SIZE = 4 * 1024 * 1024 - -# Azure block blocks must be maximum 4MB -# Azure page blobs must be aligned in 512 byte boundaries (4MB fits that) -AZURE_CHUNK_SIZE = 4 * 1024 * 1024 - -# Azure page blob must be aligned in 512 byte boundaries -AZURE_PAGE_CHUNK_SIZE = 512 - -# The time period (in seconds) for which a lease must be obtained. -# If set as -1, we get an infinite lease, but that is a bad idea. If -# after getting an infinite lease, there was an issue in releasing the -# lease, the object will remain 'locked' forever, unless the lease is -# released using the lease_id (which is not exposed to the user) -AZURE_LEASE_PERIOD = 60 - -AZURE_STORAGE_HOST_SUFFIX = 'blob.core.windows.net' - - -class AzureBlobLease(object): - """ - A class to help in leasing an azure blob and renewing the lease - """ - def __init__(self, driver, object_path, use_lease): - """ - :param driver: The Azure storage driver that is being used - :type driver: :class:`AzureStorageDriver` - - :param object_path: The path of the object we need to lease - :type object_path: ``str`` - - :param use_lease: Indicates if we must take a lease or not - :type use_lease: ``bool`` - """ - self.object_path = object_path - self.driver = driver - self.use_lease = use_lease - self.lease_id = None - self.params = {'comp': 'lease'} - - def renew(self): - """ - Renew the lease if it is older than a predefined time period - """ - if self.lease_id is None: - return - - headers = {'x-ms-lease-action': 'renew', - 'x-ms-lease-id': self.lease_id, - 'x-ms-lease-duration': '60'} - - response = self.driver.connection.request(self.object_path, - headers=headers, - params=self.params, - method='PUT') - - if response.status != httplib.OK: - raise LibcloudError('Unable to obtain lease', driver=self) - - def update_headers(self, headers): - """ - Update the lease id in the headers - """ - if self.lease_id: - headers['x-ms-lease-id'] = self.lease_id - - def __enter__(self): - if not self.use_lease: - return self - - headers = {'x-ms-lease-action': 'acquire', - 'x-ms-lease-duration': '60'} - - response = self.driver.connection.request(self.object_path, - headers=headers, - params=self.params, - method='PUT') - - if response.status == httplib.NOT_FOUND: - return self - elif response.status != httplib.CREATED: - raise LibcloudError('Unable to obtain lease', driver=self) - - self.lease_id = response.headers['x-ms-lease-id'] - return self - - def __exit__(self, type, value, traceback): - if self.lease_id is None: - return - - headers = {'x-ms-lease-action': 'release', - 'x-ms-lease-id': self.lease_id} - response = self.driver.connection.request(self.object_path, - headers=headers, - params=self.params, - method='PUT') - - if response.status != httplib.OK: - raise LibcloudError('Unable to release lease', driver=self) - - -class AzureBlobsConnection(AzureConnection): - """ - Represents a single connection to Azure Blobs - """ - - -class AzureBlobsStorageDriver(StorageDriver): - name = 'Microsoft Azure (blobs)' - website = 'http://windows.azure.com/' - connectionCls = AzureBlobsConnection - hash_type = 'md5' - supports_chunked_encoding = False - ex_blob_type = 'BlockBlob' - - def __init__(self, key, secret=None, secure=True, host=None, port=None, - **kwargs): - self._host_argument_set = bool(host) - - # B64decode() this key and keep it, so that we don't have to do - # so for every request. Minor performance improvement - secret = base64.b64decode(b(secret)) - - super(AzureBlobsStorageDriver, self).__init__(key=key, secret=secret, - secure=secure, host=host, - port=port, **kwargs) - - def _ex_connection_class_kwargs(self): - result = {} - - # host argument has precedence - if not self._host_argument_set: - result['host'] = '%s.%s' % (self.key, AZURE_STORAGE_HOST_SUFFIX) - - return result - - def _xml_to_container(self, node): - """ - Converts a container XML node to a container instance - - :param node: XML info of the container - :type node: :class:`xml.etree.ElementTree.Element` - - :return: A container instance - :rtype: :class:`Container` - """ - - name = node.findtext(fixxpath(xpath='Name')) - props = node.find(fixxpath(xpath='Properties')) - metadata = node.find(fixxpath(xpath='Metadata')) - - extra = { - 'url': node.findtext(fixxpath(xpath='Url')), - 'last_modified': node.findtext(fixxpath(xpath='Last-Modified')), - 'etag': props.findtext(fixxpath(xpath='Etag')), - 'lease': { - 'status': props.findtext(fixxpath(xpath='LeaseStatus')), - 'state': props.findtext(fixxpath(xpath='LeaseState')), - 'duration': props.findtext(fixxpath(xpath='LeaseDuration')), - }, - 'meta_data': {} - } - - for meta in metadata.getchildren(): - extra['meta_data'][meta.tag] = meta.text - - return Container(name=name, extra=extra, driver=self) - - def _response_to_container(self, container_name, response): - """ - Converts a HTTP response to a container instance - - :param container_name: Name of the container - :type container_name: ``str`` - - :param response: HTTP Response - :type node: L{} - - :return: A container instance - :rtype: :class:`Container` - """ - - headers = response.headers - extra = { - 'url': 'http://%s%s' % (response.connection.host, - response.connection.action), - 'etag': headers['etag'], - 'last_modified': headers['last-modified'], - 'lease': { - 'status': headers.get('x-ms-lease-status', None), - 'state': headers.get('x-ms-lease-state', None), - 'duration': headers.get('x-ms-lease-duration', None), - }, - 'meta_data': {} - } - - for key, value in response.headers.items(): - if key.startswith('x-ms-meta-'): - key = key.split('x-ms-meta-')[1] - extra['meta_data'][key] = value - - return Container(name=container_name, extra=extra, driver=self) - - def _xml_to_object(self, container, blob): - """ - Converts a BLOB XML node to an object instance - - :param container: Instance of the container holding the blob - :type: :class:`Container` - - :param blob: XML info of the blob - :type blob: L{} - - :return: An object instance - :rtype: :class:`Object` - """ - - name = blob.findtext(fixxpath(xpath='Name')) - props = blob.find(fixxpath(xpath='Properties')) - metadata = blob.find(fixxpath(xpath='Metadata')) - etag = props.findtext(fixxpath(xpath='Etag')) - size = int(props.findtext(fixxpath(xpath='Content-Length'))) - - extra = { - 'content_type': props.findtext(fixxpath(xpath='Content-Type')), - 'etag': etag, - 'md5_hash': props.findtext(fixxpath(xpath='Content-MD5')), - 'last_modified': props.findtext(fixxpath(xpath='Last-Modified')), - 'url': blob.findtext(fixxpath(xpath='Url')), - 'hash': props.findtext(fixxpath(xpath='Etag')), - 'lease': { - 'status': props.findtext(fixxpath(xpath='LeaseStatus')), - 'state': props.findtext(fixxpath(xpath='LeaseState')), - 'duration': props.findtext(fixxpath(xpath='LeaseDuration')), - }, - 'content_encoding': props.findtext(fixxpath( - xpath='Content-Encoding')), - 'content_language': props.findtext(fixxpath( - xpath='Content-Language')), - 'blob_type': props.findtext(fixxpath(xpath='BlobType')) - } - - if extra['md5_hash']: - value = binascii.hexlify(base64.b64decode(b(extra['md5_hash']))) - value = value.decode('ascii') - extra['md5_hash'] = value - - meta_data = {} - for meta in metadata.getchildren(): - meta_data[meta.tag] = meta.text - - return Object(name=name, size=size, hash=etag, meta_data=meta_data, - extra=extra, container=container, driver=self) - - def _response_to_object(self, object_name, container, response): - """ - Converts a HTTP response to an object (from headers) - - :param object_name: Name of the object - :type object_name: ``str`` - - :param container: Instance of the container holding the blob - :type: :class:`Container` - - :param response: HTTP Response - :type node: L{} - - :return: An object instance - :rtype: :class:`Object` - """ - - headers = response.headers - size = int(headers['content-length']) - etag = headers['etag'] - - extra = { - 'url': 'http://%s%s' % (response.connection.host, - response.connection.action), - 'etag': etag, - 'md5_hash': headers.get('content-md5', None), - 'content_type': headers.get('content-type', None), - 'content_language': headers.get('content-language', None), - 'content_encoding': headers.get('content-encoding', None), - 'last_modified': headers['last-modified'], - 'lease': { - 'status': headers.get('x-ms-lease-status', None), - 'state': headers.get('x-ms-lease-state', None), - 'duration': headers.get('x-ms-lease-duration', None), - }, - 'blob_type': headers['x-ms-blob-type'] - } - - if extra['md5_hash']: - value = binascii.hexlify(base64.b64decode(b(extra['md5_hash']))) - value = value.decode('ascii') - extra['md5_hash'] = value - - meta_data = {} - for key, value in response.headers.items(): - if key.startswith('x-ms-meta-'): - key = key.split('x-ms-meta-')[1] - meta_data[key] = value - - return Object(name=object_name, size=size, hash=etag, extra=extra, - meta_data=meta_data, container=container, driver=self) - - def iterate_containers(self): - """ - @inherits: :class:`StorageDriver.iterate_containers` - """ - params = {'comp': 'list', - 'maxresults': RESPONSES_PER_REQUEST, - 'include': 'metadata'} - - while True: - response = self.connection.request('/', params) - if response.status != httplib.OK: - raise LibcloudError('Unexpected status code: %s' % - (response.status), driver=self) - - body = response.parse_body() - containers = body.find(fixxpath(xpath='Containers')) - containers = containers.findall(fixxpath(xpath='Container')) - - for container in containers: - yield self._xml_to_container(container) - - params['marker'] = body.findtext('NextMarker') - if not params['marker']: - break - - def iterate_container_objects(self, container): - """ - @inherits: :class:`StorageDriver.iterate_container_objects` - """ - params = {'restype': 'container', - 'comp': 'list', - 'maxresults': RESPONSES_PER_REQUEST, - 'include': 'metadata'} - - container_path = self._get_container_path(container) - - while True: - response = self.connection.request(container_path, - params=params) - - if response.status == httplib.NOT_FOUND: - raise ContainerDoesNotExistError(value=None, - driver=self, - container_name=container.name) - - elif response.status != httplib.OK: - raise LibcloudError('Unexpected status code: %s' % - (response.status), driver=self) - - body = response.parse_body() - blobs = body.find(fixxpath(xpath='Blobs')) - blobs = blobs.findall(fixxpath(xpath='Blob')) - - for blob in blobs: - yield self._xml_to_object(container, blob) - - params['marker'] = body.findtext('NextMarker') - if not params['marker']: - break - - def get_container(self, container_name): - """ - @inherits: :class:`StorageDriver.get_container` - """ - params = {'restype': 'container'} - - container_path = '/%s' % (container_name) - - response = self.connection.request(container_path, params=params, - method='HEAD') - - if response.status == httplib.NOT_FOUND: - raise ContainerDoesNotExistError('Container %s does not exist' % - (container_name), driver=self, - container_name=container_name) - elif response.status != httplib.OK: - raise LibcloudError('Unexpected status code: %s' % - (response.status), driver=self) - - return self._response_to_container(container_name, response) - - def get_object(self, container_name, object_name): - """ - @inherits: :class:`StorageDriver.get_object` - """ - - container = self.get_container(container_name=container_name) - object_path = self._get_object_path(container, object_name) - - response = self.connection.request(object_path, method='HEAD') - - if response.status == httplib.OK: - obj = self._response_to_object(object_name, container, response) - return obj - - raise ObjectDoesNotExistError(value=None, driver=self, - object_name=object_name) - - def _get_container_path(self, container): - """ - Return a container path - - :param container: Container instance - :type container: :class:`Container` - - :return: A path for this container. - :rtype: ``str`` - """ - return '/%s' % (container.name) - - def _get_object_path(self, container, object_name): - """ - Return an object's CDN path. - - :param container: Container instance - :type container: :class:`Container` - - :param object_name: Object name - :type object_name: :class:`str` - - :return: A path for this object. - :rtype: ``str`` - """ - container_url = self._get_container_path(container) - object_name_cleaned = urlquote(object_name) - object_path = '%s/%s' % (container_url, object_name_cleaned) - return object_path - - def create_container(self, container_name): - """ - @inherits: :class:`StorageDriver.create_container` - """ - params = {'restype': 'container'} - - container_path = '/%s' % (container_name) - response = self.connection.request(container_path, params=params, - method='PUT') - - if response.status == httplib.CREATED: - return self._response_to_container(container_name, response) - elif response.status == httplib.CONFLICT: - raise ContainerAlreadyExistsError( - value='Container with this name already exists. The name must ' - 'be unique among all the containers in the system', - container_name=container_name, driver=self) - elif response.status == httplib.BAD_REQUEST: - raise InvalidContainerNameError(value='Container name contains ' + - 'invalid characters.', - container_name=container_name, - driver=self) - - raise LibcloudError('Unexpected status code: %s' % (response.status), - driver=self) - - def delete_container(self, container): - """ - @inherits: :class:`StorageDriver.delete_container` - """ - # Azure does not check if the container is empty. So, we will do - # a check to ensure that the behaviour is similar to other drivers - for obj in container.iterate_objects(): - raise ContainerIsNotEmptyError( - value='Container must be empty before it can be deleted.', - container_name=container.name, driver=self) - - params = {'restype': 'container'} - container_path = self._get_container_path(container) - - # Note: All the objects in the container must be deleted first - response = self.connection.request(container_path, params=params, - method='DELETE') - - if response.status == httplib.ACCEPTED: - return True - elif response.status == httplib.NOT_FOUND: - raise ContainerDoesNotExistError(value=None, - driver=self, - container_name=container.name) - - return False - - def download_object(self, obj, destination_path, overwrite_existing=False, - delete_on_failure=True): - """ - @inherits: :class:`StorageDriver.download_object` - """ - obj_path = self._get_object_path(obj.container, obj.name) - response = self.connection.request(obj_path, raw=True, data=None) - - return self._get_object(obj=obj, callback=self._save_object, - response=response, - callback_kwargs={ - 'obj': obj, - 'response': response.response, - 'destination_path': destination_path, - 'overwrite_existing': overwrite_existing, - 'delete_on_failure': delete_on_failure}, - success_status_code=httplib.OK) - - def download_object_as_stream(self, obj, chunk_size=None): - """ - @inherits: :class:`StorageDriver.download_object_as_stream` - """ - obj_path = self._get_object_path(obj.container, obj.name) - response = self.connection.request(obj_path, raw=True, data=None) - - return self._get_object(obj=obj, callback=read_in_chunks, - response=response, - callback_kwargs={'iterator': response.response, - 'chunk_size': chunk_size}, - success_status_code=httplib.OK) - - def _upload_in_chunks(self, response, data, iterator, object_path, - blob_type, lease, calculate_hash=True): - """ - Uploads data from an interator in fixed sized chunks to S3 - - :param response: Response object from the initial POST request - :type response: :class:`RawResponse` - - :param data: Any data from the initial POST request - :type data: ``str`` - - :param iterator: The generator for fetching the upload data - :type iterator: ``generator`` - - :param object_path: The path of the object to which we are uploading - :type object_name: ``str`` - - :param blob_type: The blob type being uploaded - :type blob_type: ``str`` - - :param lease: The lease object to be used for renewal - :type lease: :class:`AzureBlobLease` - - :keyword calculate_hash: Indicates if we must calculate the data hash - :type calculate_hash: ``bool`` - - :return: A tuple of (status, checksum, bytes transferred) - :rtype: ``tuple`` - """ - - # Get the upload id from the response xml - if response.status != httplib.CREATED: - raise LibcloudError('Error initializing upload. Code: %d' % - (response.status), driver=self) - - data_hash = None - if calculate_hash: - data_hash = self._get_hash_function() - - bytes_transferred = 0 - count = 1 - chunks = [] - headers = {} - - lease.update_headers(headers) - - if blob_type == 'BlockBlob': - params = {'comp': 'block'} - else: - params = {'comp': 'page'} - - # Read the input data in chunk sizes suitable for AWS - for data in read_in_chunks(iterator, AZURE_CHUNK_SIZE): - data = b(data) - content_length = len(data) - offset = bytes_transferred - bytes_transferred += content_length - - if calculate_hash: - data_hash.update(data) - - chunk_hash = self._get_hash_function() - chunk_hash.update(data) - chunk_hash = base64.b64encode(b(chunk_hash.digest())) - - headers['Content-MD5'] = chunk_hash.decode('utf-8') - headers['Content-Length'] = content_length - - if blob_type == 'BlockBlob': - # Block id can be any unique string that is base64 encoded - # A 10 digit number can hold the max value of 50000 blocks - # that are allowed for azure - block_id = base64.b64encode(b('%10d' % (count))) - block_id = block_id.decode('utf-8') - params['blockid'] = block_id - - # Keep this data for a later commit - chunks.append(block_id) - else: - headers['x-ms-page-write'] = 'update' - headers['x-ms-range'] = 'bytes=%d-%d' % \ - (offset, (bytes_transferred - 1)) - - # Renew lease before updating - lease.renew() - - resp = self.connection.request(object_path, method='PUT', - data=data, headers=headers, - params=params) - - if resp.status != httplib.CREATED: - resp.parse_error() - raise LibcloudError('Error uploading chunk %d. Code: %d' % - (count, resp.status), driver=self) - - count += 1 - - if calculate_hash: - data_hash = data_hash.hexdigest() - - if blob_type == 'BlockBlob': - self._commit_blocks(object_path, chunks, lease) - - # The Azure service does not return a hash immediately for - # chunked uploads. It takes some time for the data to get synced - response.headers['content-md5'] = None - - return (True, data_hash, bytes_transferred) - - def _commit_blocks(self, object_path, chunks, lease): - """ - Makes a final commit of the data. - - :param object_path: Server side object path. - :type object_path: ``str`` - - :param upload_id: A list of (chunk_number, chunk_hash) tuples. - :type upload_id: ``list`` - """ - - root = Element('BlockList') - - for block_id in chunks: - part = SubElement(root, 'Uncommitted') - part.text = str(block_id) - - data = tostring(root) - params = {'comp': 'blocklist'} - headers = {} - - lease.update_headers(headers) - lease.renew() - - response = self.connection.request(object_path, data=data, - params=params, headers=headers, - method='PUT') - - if response.status != httplib.CREATED: - raise LibcloudError('Error in blocklist commit', driver=self) - - def _check_values(self, blob_type, object_size): - """ - Checks if extension arguments are valid - - :param blob_type: The blob type that is being uploaded - :type blob_type: ``str`` - - :param object_size: The (max) size of the object being uploaded - :type object_size: ``int`` - """ - - if blob_type not in ['BlockBlob', 'PageBlob']: - raise LibcloudError('Invalid blob type', driver=self) - - if blob_type == 'PageBlob': - if not object_size: - raise LibcloudError('Max blob size is mandatory for page blob', - driver=self) - - if object_size % AZURE_PAGE_CHUNK_SIZE: - raise LibcloudError('Max blob size is not aligned to ' - 'page boundary', driver=self) - - def upload_object(self, file_path, container, object_name, extra=None, - verify_hash=True, ex_blob_type=None, ex_use_lease=False): - """ - Upload an object currently located on a disk. - - @inherits: :class:`StorageDriver.upload_object` - - :param ex_blob_type: Storage class - :type ex_blob_type: ``str`` - - :param ex_use_lease: Indicates if we must take a lease before upload - :type ex_use_lease: ``bool`` - """ - - if ex_blob_type is None: - ex_blob_type = self.ex_blob_type - - # Get the size of the file - file_size = os.stat(file_path).st_size - - # The presumed size of the object - object_size = file_size - - self._check_values(ex_blob_type, file_size) - - with file(file_path, 'rb') as file_handle: - iterator = iter(file_handle) - - # If size is greater than 64MB or type is Page, upload in chunks - if ex_blob_type == 'PageBlob' or file_size > AZURE_BLOCK_MAX_SIZE: - # For chunked upload of block blobs, the initial size must - # be 0. - if ex_blob_type == 'BlockBlob': - object_size = None - - object_path = self._get_object_path(container, object_name) - - upload_func = self._upload_in_chunks - upload_func_kwargs = {'iterator': iterator, - 'object_path': object_path, - 'blob_type': ex_blob_type, - 'lease': None} - else: - upload_func = self._stream_data - upload_func_kwargs = {'iterator': iterator, - 'chunked': False, - 'calculate_hash': verify_hash} - - return self._put_object(container=container, - object_name=object_name, - object_size=object_size, - upload_func=upload_func, - upload_func_kwargs=upload_func_kwargs, - file_path=file_path, extra=extra, - verify_hash=verify_hash, - blob_type=ex_blob_type, - use_lease=ex_use_lease) - - def upload_object_via_stream(self, iterator, container, object_name, - verify_hash=False, extra=None, - ex_use_lease=False, ex_blob_type=None, - ex_page_blob_size=None): - """ - @inherits: :class:`StorageDriver.upload_object_via_stream` - - :param ex_blob_type: Storage class - :type ex_blob_type: ``str`` - - :param ex_page_blob_size: The maximum size to which the - page blob can grow to - :type ex_page_blob_size: ``int`` - - :param ex_use_lease: Indicates if we must take a lease before upload - :type ex_use_lease: ``bool`` - """ - - if ex_blob_type is None: - ex_blob_type = self.ex_blob_type - - self._check_values(ex_blob_type, ex_page_blob_size) - - object_path = self._get_object_path(container, object_name) - - upload_func = self._upload_in_chunks - upload_func_kwargs = {'iterator': iterator, - 'object_path': object_path, - 'blob_type': ex_blob_type, - 'lease': None} - - return self._put_object(container=container, - object_name=object_name, - object_size=ex_page_blob_size, - upload_func=upload_func, - upload_func_kwargs=upload_func_kwargs, - extra=extra, verify_hash=verify_hash, - blob_type=ex_blob_type, - use_lease=ex_use_lease) - - def delete_object(self, obj): - """ - @inherits: :class:`StorageDriver.delete_object` - """ - object_path = self._get_object_path(obj.container, obj.name) - response = self.connection.request(object_path, method='DELETE') - - if response.status == httplib.ACCEPTED: - return True - elif response.status == httplib.NOT_FOUND: - raise ObjectDoesNotExistError(value=None, driver=self, - object_name=obj.name) - - return False - - def _update_metadata(self, headers, meta_data): - """ - Update the given metadata in the headers - - :param headers: The headers dictionary to be updated - :type headers: ``dict`` - - :param meta_data: Metadata key value pairs - :type meta_data: ``dict`` - """ - for key, value in list(meta_data.items()): - key = 'x-ms-meta-%s' % (key) - headers[key] = value - - def _prepare_upload_headers(self, object_name, object_size, - extra, meta_data, blob_type): - """ - Prepare headers for uploading an object - - :param object_name: The full name of the object being updated - :type object_name: ``str`` - - :param object_size: The size of the object. In case of PageBlobs, - this indicates the maximum size the blob can grow to - :type object_size: ``int`` - - :param extra: Extra control data for the upload - :type extra: ``dict`` - - :param meta_data: Metadata key value pairs - :type meta_data: ``dict`` - - :param blob_type: Page or Block blob type - :type blob_type: ``str`` - """ - headers = {} - - if blob_type is None: - blob_type = self.ex_blob_type - - headers['x-ms-blob-type'] = blob_type - - self._update_metadata(headers, meta_data) - - if object_size is not None: - headers['Content-Length'] = object_size - - if blob_type == 'PageBlob': - headers['Content-Length'] = 0 - headers['x-ms-blob-content-length'] = object_size - - return headers - - def _put_object(self, container, object_name, object_size, upload_func, - upload_func_kwargs, file_path=None, extra=None, - verify_hash=True, blob_type=None, use_lease=False): - """ - Control function that does the real job of uploading data to a blob - """ - extra = extra or {} - meta_data = extra.get('meta_data', {}) - content_type = extra.get('content_type', None) - - headers = self._prepare_upload_headers(object_name, object_size, - extra, meta_data, blob_type) - - object_path = self._get_object_path(container, object_name) - - # Get a lease if required and do the operations - with AzureBlobLease(self, object_path, use_lease) as lease: - if 'lease' in upload_func_kwargs: - upload_func_kwargs['lease'] = lease - - lease.update_headers(headers) - - iterator = iter('') - result_dict = self._upload_object(object_name, content_type, - upload_func, upload_func_kwargs, - object_path, headers=headers, - file_path=file_path, - iterator=iterator) - - response = result_dict['response'] - bytes_transferred = result_dict['bytes_transferred'] - data_hash = result_dict['data_hash'] - headers = response.headers - response = response.response - - if response.status != httplib.CREATED: - raise LibcloudError( - 'Unexpected status code, status_code=%s' % (response.status), - driver=self) - - server_hash = headers['content-md5'] - - if server_hash: - server_hash = binascii.hexlify(base64.b64decode(b(server_hash))) - server_hash = server_hash.decode('utf-8') - else: - # TODO: HACK - We could poll the object for a while and get - # the hash - pass - - if (verify_hash and server_hash and data_hash != server_hash): - raise ObjectHashMismatchError( - value='MD5 hash checksum does not match', - object_name=object_name, driver=self) - - return Object(name=object_name, size=bytes_transferred, - hash=headers['etag'], extra=None, - meta_data=meta_data, container=container, - driver=self) - - def ex_set_object_metadata(self, obj, meta_data): - """ - Set metadata for an object - - :param obj: The blob object - :type obj: :class:`Object` - - :param meta_data: Metadata key value pairs - :type meta_data: ``dict`` - """ - object_path = self._get_object_path(obj.container, obj.name) - params = {'comp': 'metadata'} - headers = {} - - self._update_metadata(headers, meta_data) - - response = self.connection.request(object_path, method='PUT', - params=params, - headers=headers) - - if response.status != httplib.OK: - response.parse_error('Setting metadata')