Duncan McGreggor has proposed merging lp:~oubiwann/txaws/416109-arbitrary-endpoints into lp:txaws.
-- https://code.launchpad.net/~oubiwann/txaws/416109-arbitrary-endpoints/+merge/10671 Your team txAWS Team is subscribed to branch lp:txaws.
=== modified file 'txaws/client/gui/gtk.py' --- txaws/client/gui/gtk.py 2009-08-18 22:53:53 +0000 +++ txaws/client/gui/gtk.py 2009-08-25 16:04:05 +0000 @@ -9,6 +9,7 @@ import gtk from txaws.credentials import AWSCredentials +from txaws.service import AWSServiceRegion __all__ = ['main'] @@ -30,6 +31,7 @@ creds = AWSCredentials() except ValueError: creds = self.from_gnomekeyring() + self.region = AWSServiceRegion(creds) self.create_client(creds) menu = ''' <ui> @@ -55,9 +57,8 @@ self.connect('popup-menu', self.on_popup_menu) def create_client(self, creds): - from txaws.ec2.client import EC2Client if creds is not None: - self.client = EC2Client(creds=creds) + self.client = self.region.get_ec2_client() self.on_activate(None) else: # waiting on user entered credentials. @@ -134,6 +135,7 @@ key_id = content.get_children()[0].get_children()[1].get_text() secret_key = content.get_children()[1].get_children()[1].get_text() creds = AWSCredentials(access_key=key_id, secret_key=secret_key) + region = AWSServiceRegion(creds=creds) self.create_client(creds) gnomekeyring.item_create_sync( None, @@ -196,7 +198,10 @@ """ if reactor is None: from twisted.internet import gtk2reactor - gtk2reactor.install() + try: + gtk2reactor.install() + except AssertionError: + pass from twisted.internet import reactor status = AWSStatusIcon(reactor) gobject.set_application_name('aws-status') === modified file 'txaws/credentials.py' --- txaws/credentials.py 2009-08-17 11:18:56 +0000 +++ txaws/credentials.py 2009-08-25 14:36:20 +0000 @@ -5,32 +5,37 @@ import os -from txaws.util import * +from txaws.util import hmac_sha1 __all__ = ['AWSCredentials'] +ENV_ACCESS_KEY = "AWS_ACCESS_KEY_ID" +ENV_SECRET_KEY = "AWS_SECRET_ACCESS_KEY" + + class AWSCredentials(object): - def __init__(self, access_key=None, secret_key=None): + def __init__(self, access_key="", secret_key=""): """Create an AWSCredentials object. - :param access_key: The access key to use. If None the environment + @param access_key: The access key to use. If None the environment variable AWS_ACCESS_KEY_ID is consulted. - :param secret_key: The secret key to use. If None the environment + @param secret_key: The secret key to use. If None the environment variable AWS_SECRET_ACCESS_KEY is consulted. """ + self.access_key = access_key self.secret_key = secret_key - if self.secret_key is None: - self.secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY') - if self.secret_key is None: - raise ValueError('Could not find AWS_SECRET_ACCESS_KEY') - self.access_key = access_key - if self.access_key is None: - self.access_key = os.environ.get('AWS_ACCESS_KEY_ID') - if self.access_key is None: - raise ValueError('Could not find AWS_ACCESS_KEY_ID') + if not self.access_key: + self.access_key = os.environ.get(ENV_ACCESS_KEY) + if not self.access_key: + raise ValueError("Could not find %s" % ENV_ACCESS_KEY) + # perform checks for secret key + if not self.secret_key: + self.secret_key = os.environ.get(ENV_SECRET_KEY) + if not self.secret_key: + raise ValueError("Could not find %s" % ENV_SECRET_KEY) def sign(self, bytes): """Sign some bytes.""" === modified file 'txaws/ec2/client.py' --- txaws/ec2/client.py 2009-08-21 03:26:35 +0000 +++ txaws/ec2/client.py 2009-08-25 16:04:05 +0000 @@ -8,7 +8,8 @@ from twisted.web.client import getPage -from txaws import credentials +from txaws.credentials import AWSCredentials +from txaws.service import AWSServiceEndpoint from txaws.util import iso8601time, XML @@ -77,16 +78,16 @@ name_space = '{http://ec2.amazonaws.com/doc/2008-12-01/}' - def __init__(self, creds=None, query_factory=None): + def __init__(self, creds=None, endpoint=None, query_factory=None): """Create an EC2Client. - @param creds: Explicit credentials to use. If None, credentials are - inferred as per txaws.credentials.AWSCredentials. + @param creds: User authentication credentials to use. + @param endpoint: The service endpoint URI. + @param query_factory: The class or function that produces a query + object for making requests to the EC2 service. """ - if creds is None: - self.creds = credentials.AWSCredentials() - else: - self.creds = creds + self.creds = creds or AWSCredentials() + self.endpoint = endpoint or AWSServiceEndpoint() if query_factory is None: self.query_factory = Query else: @@ -94,7 +95,7 @@ def describe_instances(self): """Describe current instances.""" - q = self.query_factory('DescribeInstances', self.creds) + q = self.query_factory('DescribeInstances', self.creds, self.endpoint) d = q.submit() return d.addCallback(self._parse_instances) @@ -177,7 +178,8 @@ instanceset = {} for pos, instance_id in enumerate(instance_ids): instanceset["InstanceId.%d" % (pos+1)] = instance_id - q = self.query_factory('TerminateInstances', self.creds, instanceset) + q = self.query_factory('TerminateInstances', self.creds, self.endpoint, + instanceset) d = q.submit() return d.addCallback(self._parse_terminate_instances) @@ -200,24 +202,24 @@ class Query(object): """A query that may be submitted to EC2.""" - def __init__(self, action, creds, other_params=None, time_tuple=None): + def __init__(self, action, creds, endpoint, other_params=None, + time_tuple=None): """Create a Query to submit to EC2.""" + self.creds = creds + self.endpoint = endpoint # Require params (2008-12-01 API): # Version, SignatureVersion, SignatureMethod, Action, AWSAccessKeyId, # Timestamp || Expires, Signature, - self.params = {'Version': '2008-12-01', + self.params = { + 'Version': '2008-12-01', 'SignatureVersion': '2', 'SignatureMethod': 'HmacSHA1', 'Action': action, - 'AWSAccessKeyId': creds.access_key, + 'AWSAccessKeyId': self.creds.access_key, 'Timestamp': iso8601time(time_tuple), } if other_params: self.params.update(other_params) - self.method = 'GET' - self.host = 'ec2.amazonaws.com' - self.uri = '/' - self.creds = creds def canonical_query_params(self): """Return the canonical query params (used in signing).""" @@ -230,18 +232,19 @@ """Encode a_string as per the canonicalisation encoding rules. See the AWS dev reference page 90 (2008-12-01 version). - :return: a_string encoded. + @return: a_string encoded. """ return quote(a_string, safe='~') def signing_text(self): """Return the text to be signed when signing the query.""" - result = "%s\n%s\n%s\n%s" % (self.method, self.host, self.uri, - self.canonical_query_params()) + result = "%s\n%s\n%s\n%s" % (self.endpoint.method, self.endpoint.host, + self.endpoint.path, + self.canonical_query_params()) return result def sign(self): - """Sign this query using its built in credentials. + """Sign this query using its built in creds. This prepares it to be sent, and should be done as the last step before submitting the query. Signing is done automatically - this is a public @@ -256,9 +259,9 @@ def submit(self): """Submit this query. - :return: A deferred from twisted.web.client.getPage + @return: A deferred from twisted.web.client.getPage """ self.sign() - url = 'http://%s%s?%s' % (self.host, self.uri, - self.canonical_query_params()) - return getPage(url, method=self.method) + url = "%s?%s" % (self.endpoint.get_uri(), + self.canonical_query_params()) + return getPage(url, method=self.endpoint.method) === modified file 'txaws/ec2/tests/test_client.py' --- txaws/ec2/tests/test_client.py 2009-08-21 03:26:35 +0000 +++ txaws/ec2/tests/test_client.py 2009-08-25 16:04:05 +0000 @@ -7,6 +7,7 @@ from txaws.credentials import AWSCredentials from txaws.ec2 import client +from txaws.service import AWSServiceEndpoint, EC2_ENDPOINT_US from txaws.tests import TXAWSTestCase @@ -117,7 +118,7 @@ self.assertEquals(instance.ramdisk_id, "id4") -class TestEC2Client(TXAWSTestCase): +class EC2ClientTestCase(TXAWSTestCase): def test_init_no_creds(self): os.environ['AWS_SECRET_ACCESS_KEY'] = 'foo' @@ -129,7 +130,7 @@ self.assertRaises(ValueError, client.EC2Client) def test_init_explicit_creds(self): - creds = 'foo' + creds = AWSCredentials("foo", "bar") ec2 = client.EC2Client(creds=creds) self.assertEqual(creds, ec2.creds) @@ -162,33 +163,40 @@ def test_parse_reservation(self): - ec2 = client.EC2Client(creds='foo') + creds = AWSCredentials("foo", "bar") + ec2 = client.EC2Client(creds=creds) results = ec2._parse_instances(sample_describe_instances_result) self.check_parsed_instances(results) def test_describe_instances(self): class StubQuery(object): - def __init__(stub, action, creds): + def __init__(stub, action, creds, endpoint): self.assertEqual(action, 'DescribeInstances') - self.assertEqual('foo', creds) + self.assertEqual(creds.access_key, "foo") + self.assertEqual(creds.secret_key, "bar") def submit(self): return succeed(sample_describe_instances_result) - ec2 = client.EC2Client(creds='foo', query_factory=StubQuery) + creds = AWSCredentials("foo", "bar") + ec2 = client.EC2Client(creds, query_factory=StubQuery) d = ec2.describe_instances() d.addCallback(self.check_parsed_instances) return d def test_terminate_instances(self): class StubQuery(object): - def __init__(stub, action, creds, other_params): + def __init__(stub, action, creds, endpoint, other_params): self.assertEqual(action, 'TerminateInstances') - self.assertEqual('foo', creds) + self.assertEqual(creds.access_key, "foo") + self.assertEqual(creds.secret_key, "bar") self.assertEqual( {'InstanceId.1': 'i-1234', 'InstanceId.2': 'i-5678'}, other_params) def submit(self): return succeed(sample_terminate_instances_result) - ec2 = client.EC2Client(creds='foo', query_factory=StubQuery) + creds = AWSCredentials("foo", "bar") + endpoint = AWSServiceEndpoint(uri=EC2_ENDPOINT_US) + ec2 = client.EC2Client(creds=creds, endpoint=endpoint, + query_factory=StubQuery) d = ec2.terminate_instances('i-1234', 'i-5678') def check_transition(changes): self.assertEqual([('i-1234', 'running', 'shutting-down'), @@ -196,14 +204,15 @@ return d -class TestQuery(TXAWSTestCase): +class QueryTestCase(TXAWSTestCase): def setUp(self): TXAWSTestCase.setUp(self) self.creds = AWSCredentials('foo', 'bar') + self.endpoint = AWSServiceEndpoint(uri=EC2_ENDPOINT_US) def test_init_minimum(self): - query = client.Query('DescribeInstances', self.creds) + query = client.Query('DescribeInstances', self.creds, self.endpoint) self.assertTrue('Timestamp' in query.params) del query.params['Timestamp'] self.assertEqual( @@ -221,7 +230,7 @@ self.assertRaises(TypeError, client.Query, None) def test_init_other_args_are_params(self): - query = client.Query('DescribeInstances', self.creds, + query = client.Query('DescribeInstances', self.creds, self.endpoint, {'InstanceId.0': '12345'}, time_tuple=(2007,11,12,13,14,15,0,0,0)) self.assertEqual( @@ -235,7 +244,7 @@ query.params) def test_sorted_params(self): - query = client.Query('DescribeInstances', self.creds, + query = client.Query('DescribeInstances', self.creds, self.endpoint, {'fun': 'games'}, time_tuple=(2007,11,12,13,14,15,0,0,0)) self.assertEqual([ @@ -251,16 +260,16 @@ def test_encode_unreserved(self): all_unreserved = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ' 'abcdefghijklmnopqrstuvwxyz0123456789-_.~') - query = client.Query('DescribeInstances', self.creds) + query = client.Query('DescribeInstances', self.creds, self.endpoint) self.assertEqual(all_unreserved, query.encode(all_unreserved)) def test_encode_space(self): """This may be just 'url encode', but the AWS manual isn't clear.""" - query = client.Query('DescribeInstances', self.creds) + query = client.Query('DescribeInstances', self.creds, self.endpoint) self.assertEqual('a%20space', query.encode('a space')) def test_canonical_query(self): - query = client.Query('DescribeInstances', self.creds, + query = client.Query('DescribeInstances', self.creds, self.endpoint, {'fu n': 'g/ames', 'argwithnovalue':'', 'InstanceId.1': 'i-1234'}, time_tuple=(2007,11,12,13,14,15,0,0,0)) @@ -272,17 +281,17 @@ self.assertEqual(expected_query, query.canonical_query_params()) def test_signing_text(self): - query = client.Query('DescribeInstances', self.creds, + query = client.Query('DescribeInstances', self.creds, self.endpoint, time_tuple=(2007,11,12,13,14,15,0,0,0)) - signing_text = ('GET\nec2.amazonaws.com\n/\n' + signing_text = ('GET\n%s\n/\n' % self.endpoint.host + 'AWSAccessKeyId=foo&Action=DescribeInstances&' 'SignatureMethod=HmacSHA1&SignatureVersion=2&' 'Timestamp=2007-11-12T13%3A14%3A15Z&Version=2008-12-01') self.assertEqual(signing_text, query.signing_text()) def test_sign(self): - query = client.Query('DescribeInstances', self.creds, + query = client.Query('DescribeInstances', self.creds, self.endpoint, time_tuple=(2007,11,12,13,14,15,0,0,0)) query.sign() - self.assertEqual('4hEtLuZo9i6kuG3TOXvRQNOrE/U=', + self.assertEqual('JuCpwFA2H4OVF3Ql/lAQs+V6iMc=', query.params['Signature']) === added file 'txaws/service.py' --- txaws/service.py 1970-01-01 00:00:00 +0000 +++ txaws/service.py 2009-08-25 16:04:05 +0000 @@ -0,0 +1,100 @@ +# Copyright (C) 2009 Duncan McGreggor <dun...@canonical.com> +# Copyright (C) 2009 Robert Collins <robe...@robertcollins.net> +# Licenced under the txaws licence available at /LICENSE in the txaws source. + +import os + +from twisted.web.client import _parse + + + +__all__ = ["AWSServiceEndpoint", "AWSServiceRegion", "REGION_US", "REGION_EU"] + + +REGION_US = "US" +REGION_EU = "EU" +EC2_ENDPOINT_US = "https://us-east-1.ec2.amazonaws.com/" +EC2_ENDPOINT_EU = "https://eu-west-1.ec2.amazonaws.com/" +DEFAULT_PORT = 80 + + +class AWSServiceEndpoint(object): + """ + @param uri: The URL for the service. + @param method: The HTTP method used when accessing a service. + """ + + def __init__(self, uri="", method="GET"): + self.host = "" + self.port = DEFAULT_PORT + self.path = "/" + self.method = method + self._parse_uri(uri) + if not self.scheme: + self.scheme = "http" + + def _parse_uri(self, uri): + scheme, host, port, path = _parse( + str(uri), defaultPort=DEFAULT_PORT) + self.scheme = scheme + self.host = host + self.port = port + self.path = path + + def set_path(self, path): + self.path = path + + def get_uri(self): + """Get a URL representation of the service.""" + uri = "%s://%s" % (self.scheme, self.host) + if self.port and self.port != DEFAULT_PORT: + uri = "%s:%s" % (uri, self.port) + return uri + self.path + + +class AWSServiceRegion(object): + """ + This object represents a collection of client factories that use the same + credentials. With Amazon, this collection is associated with a region + (e.g., US or EU). + """ + def __init__(self, creds=None, region=REGION_US): + self.creds = creds + self._clients = {} + if region == REGION_US: + ec2_endpoint = EC2_ENDPOINT_US + elif region == REGION_EU: + ec2_endpoint = EC2_ENDPOINT_EU + self.ec2_endpoint = AWSServiceEndpoint(uri=ec2_endpoint) + + def get_client(self, cls, purge_cache=False, *args, **kwds): + """ + This is a general method for getting a client: if present, it is pulled + from the cache; if not, a new one is instantiated and then put into the + cache. This method should not be called directly, but rather by other + client-specific methods (e.g., get_ec2_client). + """ + key = str(cls) + str(args) + str(kwds) + instance = self._clients.get(key) + if purge_cache or not instance: + instance = cls(*args, **kwds) + self._clients[key] = instance + return instance + + def get_ec2_client(self, creds=None): + from txaws.ec2.client import EC2Client + + if creds: + self.creds = creds + return self.get_client(EC2Client, creds=creds, + endpoint=self.ec2_endpoint, query_factory=None) + + def get_s3_client(self): + raise NotImplementedError + + def get_simpledb_client(self): + raise NotImplementedError + + def get_sqs_client(self): + raise NotImplementedError + === modified file 'txaws/storage/client.py' --- txaws/storage/client.py 2009-08-20 12:15:12 +0000 +++ txaws/storage/client.py 2009-08-21 14:50:25 +0000 @@ -25,19 +25,18 @@ class S3Request(object): def __init__(self, verb, bucket=None, object_name=None, data='', - content_type=None, - metadata={}, root_uri='https://s3.amazonaws.com', creds=None): + content_type=None, metadata={}, creds=None, endpoint=None): self.verb = verb self.bucket = bucket self.object_name = object_name self.data = data self.content_type = content_type self.metadata = metadata - self.root_uri = root_uri self.creds = creds + self.endpoint = endpoint or self.get_uri() self.date = datetimeToString() - def get_uri_path(self): + def get_path(self): path = '/' if self.bucket is not None: path += self.bucket @@ -46,7 +45,8 @@ return path def get_uri(self): - return self.root_uri + self.get_uri_path() + self.endpoint.set_path(self.get_path()) + return self.endpoint.get_uri() def get_headers(self): headers = {'Content-Length': len(self.data), @@ -66,7 +66,7 @@ return headers def get_canonicalized_resource(self): - return self.get_uri_path() + return self.get_path() def get_canonicalized_amz_headers(self, headers): result = '' @@ -76,12 +76,12 @@ return ''.join('%s:%s\n' % (name, value) for name, value in headers) def get_signature(self, headers): - text = self.verb + '\n' - text += headers.get('Content-MD5', '') + '\n' - text += headers.get('Content-Type', '') + '\n' - text += headers.get('Date', '') + '\n' - text += self.get_canonicalized_amz_headers(headers) - text += self.get_canonicalized_resource() + text = (self.verb + '\n' + + headers.get('Content-MD5', '') + '\n' + + headers.get('Content-Type', '') + '\n' + + headers.get('Date', '') + '\n' + + self.get_canonicalized_amz_headers(headers) + + self.get_canonicalized_resource()) return self.creds.sign(text) def submit(self): @@ -94,20 +94,21 @@ class S3(object): - root_uri = 'https://s3.amazonaws.com/' request_factory = S3Request - def __init__(self, creds): + def __init__(self, creds, endpoint): self.creds = creds + self.endpoint = endpoint def make_request(self, *a, **kw): """ Create a request with the arguments passed in. - This uses the request_factory attribute, adding the credentials to the - arguments passed in. + This uses the request_factory attribute, adding the creds and endpoint + to the arguments passed in. """ - return self.request_factory(creds=self.creds, *a, **kw) + return self.request_factory(creds=self.creds, endpoint=self.endpoint, + *a, **kw) def _parse_bucket_list(self, response): """ === modified file 'txaws/storage/tests/test_client.py' --- txaws/storage/tests/test_client.py 2009-08-20 12:15:12 +0000 +++ txaws/storage/tests/test_client.py 2009-08-21 14:50:25 +0000 @@ -5,6 +5,7 @@ from twisted.internet.defer import succeed from txaws.credentials import AWSCredentials +from txaws.service import AWSServiceEndpoint from txaws.storage.client import S3, S3Request from txaws.tests import TXAWSTestCase from txaws.util import calculate_md5 @@ -18,10 +19,10 @@ return succeed('') -class RequestTests(TXAWSTestCase): +class RequestTestCase(TXAWSTestCase): - creds = AWSCredentials(access_key='0PN5J17HBGZHT7JJ3X82', - secret_key='uV3F3YluFJax1cknvbcGwgjvx4QpvB+leU8dUj2o') + creds = AWSCredentials(access_key='fookeyid', secret_key='barsecretkey') + endpoint = AWSServiceEndpoint("https://s3.amazonaws.com/") def test_objectRequest(self): """ @@ -31,18 +32,22 @@ DIGEST = 'zhdB6gwvocWv/ourYUWMxA==' request = S3Request('PUT', 'somebucket', 'object/name/here', DATA, - content_type='text/plain', metadata={'foo': 'bar'}) + content_type='text/plain', metadata={'foo': 'bar'}, + creds=self.creds, endpoint=self.endpoint) + request.get_signature = lambda headers: "TESTINGSIG=" self.assertEqual(request.verb, 'PUT') self.assertEqual( request.get_uri(), 'https://s3.amazonaws.com/somebucket/object/name/here') headers = request.get_headers() self.assertNotEqual(headers.pop('Date'), '') - self.assertEqual(headers, - {'Content-Type': 'text/plain', - 'Content-Length': len(DATA), - 'Content-MD5': DIGEST, - 'x-amz-meta-foo': 'bar'}) + self.assertEqual( + headers, { + 'Authorization': 'AWS fookeyid:TESTINGSIG=', + 'Content-Type': 'text/plain', + 'Content-Length': len(DATA), + 'Content-MD5': DIGEST, + 'x-amz-meta-foo': 'bar'}) self.assertEqual(request.data, 'objectData') def test_bucketRequest(self): @@ -51,22 +56,27 @@ """ DIGEST = '1B2M2Y8AsgTpgAmY7PhCfg==' - request = S3Request('GET', 'somebucket') + request = S3Request('GET', 'somebucket', creds=self.creds, + endpoint=self.endpoint) + request.get_signature = lambda headers: "TESTINGSIG=" self.assertEqual(request.verb, 'GET') self.assertEqual( request.get_uri(), 'https://s3.amazonaws.com/somebucket') headers = request.get_headers() self.assertNotEqual(headers.pop('Date'), '') - self.assertEqual(headers, - {'Content-Length': 0, - 'Content-MD5': DIGEST}) + self.assertEqual( + headers, { + 'Authorization': 'AWS fookeyid:TESTINGSIG=', + 'Content-Length': 0, + 'Content-MD5': DIGEST}) self.assertEqual(request.data, '') def test_submit(self): """ Submitting the request should invoke getPage correctly. """ - request = StubbedS3Request('GET', 'somebucket') + request = StubbedS3Request('GET', 'somebucket', creds=self.creds, + endpoint=self.endpoint) def _postCheck(result): self.assertEqual(result, '') @@ -80,13 +90,14 @@ return request.submit().addCallback(_postCheck) def test_authenticationTestCases(self): - req = S3Request('GET', creds=self.creds) - req.date = 'Wed, 28 Mar 2007 01:29:59 +0000' + request = S3Request('GET', creds=self.creds, endpoint=self.endpoint) + request.get_signature = lambda headers: "TESTINGSIG=" + request.date = 'Wed, 28 Mar 2007 01:29:59 +0000' - headers = req.get_headers() + headers = request.get_headers() self.assertEqual( headers['Authorization'], - 'AWS 0PN5J17HBGZHT7JJ3X82:jF7L3z/FTV47vagZzhKupJ9oNig=') + 'AWS fookeyid:TESTINGSIG=') class InertRequest(S3Request): @@ -153,16 +164,18 @@ TXAWSTestCase.setUp(self) self.creds = AWSCredentials( access_key='accessKey', secret_key='secretKey') - self.s3 = TestableS3(creds=self.creds) + self.endpoint = AWSServiceEndpoint() + self.s3 = TestableS3(creds=self.creds, endpoint=self.endpoint) def test_make_request(self): """ - Test that make_request passes in the service credentials. + Test that make_request passes in the credentials object. """ marker = object() def _cb(*a, **kw): self.assertEqual(kw['creds'], self.creds) + self.assertEqual(kw['endpoint'], self.endpoint) return marker self.s3.request_factory = _cb === modified file 'txaws/tests/test_credentials.py' --- txaws/tests/test_credentials.py 2009-08-20 12:15:12 +0000 +++ txaws/tests/test_credentials.py 2009-08-25 14:36:20 +0000 @@ -5,37 +5,49 @@ from twisted.trial.unittest import TestCase -from txaws.credentials import AWSCredentials +from txaws.credentials import AWSCredentials, ENV_ACCESS_KEY, ENV_SECRET_KEY +from txaws.tests import TXAWSTestCase + from txaws.tests import TXAWSTestCase class TestCredentials(TXAWSTestCase): + def setUp(self): + self.addCleanup(self.clean_environment) + + def clean_environment(self): + if os.environ.has_key(ENV_ACCESS_KEY): + del os.environ[ENV_ACCESS_KEY] + if os.environ.has_key(ENV_SECRET_KEY): + del os.environ[ENV_SECRET_KEY] + def test_no_access_errors(self): - # Without anything in os.environ, AWSCredentials() blows up - os.environ['AWS_SECRET_ACCESS_KEY'] = 'foo' - self.assertRaises(Exception, AWSCredentials) + # Without anything in os.environ, AWSService() blows up + os.environ[ENV_SECRET_KEY] = "bar" + self.assertRaises(ValueError, AWSCredentials) def test_no_secret_errors(self): - # Without anything in os.environ, AWSCredentials() blows up - os.environ['AWS_ACCESS_KEY_ID'] = 'bar' - self.assertRaises(Exception, AWSCredentials) + # Without anything in os.environ, AWSService() blows up + os.environ[ENV_ACCESS_KEY] = "foo" + self.assertRaises(ValueError, AWSCredentials) def test_found_values_used(self): - os.environ['AWS_SECRET_ACCESS_KEY'] = 'foo' - os.environ['AWS_ACCESS_KEY_ID'] = 'bar' - creds = AWSCredentials() - self.assertEqual('foo', creds.secret_key) - self.assertEqual('bar', creds.access_key) + os.environ[ENV_ACCESS_KEY] = "foo" + os.environ[ENV_SECRET_KEY] = "bar" + service = AWSCredentials() + self.assertEqual("foo", service.access_key) + self.assertEqual("bar", service.secret_key) + self.clean_environment() def test_explicit_access_key(self): - os.environ['AWS_SECRET_ACCESS_KEY'] = 'foo' - creds = AWSCredentials(access_key='bar') - self.assertEqual('foo', creds.secret_key) - self.assertEqual('bar', creds.access_key) + os.environ[ENV_SECRET_KEY] = "foo" + service = AWSCredentials(access_key="bar") + self.assertEqual("foo", service.secret_key) + self.assertEqual("bar", service.access_key) def test_explicit_secret_key(self): - os.environ['AWS_ACCESS_KEY_ID'] = 'bar' - creds = AWSCredentials(secret_key='foo') - self.assertEqual('foo', creds.secret_key) - self.assertEqual('bar', creds.access_key) + os.environ[ENV_ACCESS_KEY] = "bar" + service = AWSCredentials(secret_key="foo") + self.assertEqual("foo", service.secret_key) + self.assertEqual("bar", service.access_key) === added file 'txaws/tests/test_service.py' --- txaws/tests/test_service.py 1970-01-01 00:00:00 +0000 +++ txaws/tests/test_service.py 2009-08-25 16:16:22 +0000 @@ -0,0 +1,116 @@ +# Copyright (C) 2009 Duncan McGreggor <dun...@canonical.com> +# Licenced under the txaws licence available at /LICENSE in the txaws source. + +import os + +from txaws.credentials import AWSCredentials +from txaws.ec2.client import EC2Client +from txaws.service import AWSServiceEndpoint, AWSServiceRegion, EC2_ENDPOINT_US +from txaws.tests import TXAWSTestCase + +class AWSServiceEndpointTestCase(TXAWSTestCase): + + def setUp(self): + self.endpoint = AWSServiceEndpoint(uri="http://my.service/da_endpoint") + + def test_simple_creation(self): + endpoint = AWSServiceEndpoint() + self.assertEquals(endpoint.scheme, "http") + self.assertEquals(endpoint.host, "") + self.assertEquals(endpoint.port, 80) + self.assertEquals(endpoint.path, "/") + self.assertEquals(endpoint.method, "GET") + + def test_parse_uri(self): + self.assertEquals(self.endpoint.scheme, "http") + self.assertEquals(self.endpoint.host, "my.service") + self.assertEquals(self.endpoint.port, 80) + self.assertEquals(self.endpoint.path, "/da_endpoint") + + def test_parse_uri_https_and_custom_port(self): + endpoint = AWSServiceEndpoint(uri="https://my.service:8080/endpoint") + self.assertEquals(endpoint.scheme, "https") + self.assertEquals(endpoint.host, "my.service") + self.assertEquals(endpoint.port, 8080) + self.assertEquals(endpoint.path, "/endpoint") + + def test_custom_method(self): + endpoint = AWSServiceEndpoint(uri="http://service/endpoint", + method="PUT") + self.assertEquals(endpoint.method, "PUT") + + def test_get_uri(self): + uri = self.endpoint.get_uri() + self.assertEquals(uri, "http://my.service/da_endpoint") + + def test_get_uri_custom_port(self): + uri = "https://my.service:8080/endpoint" + endpoint = AWSServiceEndpoint(uri=uri) + new_uri = endpoint.get_uri() + self.assertEquals(new_uri, uri) + + def test_set_path(self): + original_path = self.endpoint.path + self.endpoint.set_path("/newpath") + self.assertEquals( + self.endpoint.get_uri(), + "http://my.service/newpath") + + +class AWSServiceRegionTestCase(TXAWSTestCase): + + def setUp(self): + self.creds = AWSCredentials("foo", "bar") + self.region = AWSServiceRegion(creds=self.creds) + + def test_simple_creation(self): + self.assertEquals(self.creds, self.region.creds) + self.assertEquals(self.region._clients, {}) + self.assertEquals(self.region.ec2_endpoint.get_uri(), EC2_ENDPOINT_US) + + def test_get_client_with_empty_cache(self): + key = str(EC2Client) + str(self.creds) + str(self.region.ec2_endpoint) + original_client = self.region._clients.get(key) + new_client = self.region.get_client( + EC2Client, creds=self.creds, endpoint=self.region.ec2_endpoint) + self.assertEquals(original_client, None) + self.assertTrue(isinstance(new_client, EC2Client)) + self.assertNotEquals(original_client, new_client) + + def test_get_client_from_cache(self): + client1 = self.region.get_client( + EC2Client, creds=self.creds, endpoint=self.region.ec2_endpoint) + client2 = self.region.get_client( + EC2Client, creds=self.creds, endpoint=self.region.ec2_endpoint) + self.assertTrue(isinstance(client1, EC2Client)) + self.assertTrue(isinstance(client2, EC2Client)) + self.assertEquals(client1, client2) + + def test_get_client_from_cache_with_purge(self): + client1 = self.region.get_client( + EC2Client, creds=self.creds, endpoint=self.region.ec2_endpoint, + purge_cache=True) + client2 = self.region.get_client( + EC2Client, creds=self.creds, endpoint=self.region.ec2_endpoint, + purge_cache=True) + self.assertTrue(isinstance(client1, EC2Client)) + self.assertTrue(isinstance(client2, EC2Client)) + self.assertNotEquals(client1, client2) + + def test_get_ec2_client_from_cache(self): + client1 = self.region.get_ec2_client(self.creds) + client2 = self.region.get_ec2_client(self.creds) + self.assertEquals(self.creds, self.region.creds) + self.assertTrue(isinstance(client1, EC2Client)) + self.assertTrue(isinstance(client2, EC2Client)) + self.assertEquals(client1, client2) + + + def test_get_s3_client(self): + self.assertRaises(NotImplementedError, self.region.get_s3_client) + + def test_get_simpledb_client(self): + self.assertRaises(NotImplementedError, self.region.get_simpledb_client) + + def test_get_sqs_client(self): + self.assertRaises(NotImplementedError, self.region.get_sqs_client)
_______________________________________________ Mailing list: https://launchpad.net/~txawsteam Post to : txawsteam@lists.launchpad.net Unsubscribe : https://launchpad.net/~txawsteam More help : https://help.launchpad.net/ListHelp