Started to fix some of the broken tests defined inside of `test_server_registration`. --- .../server/test/unit-test/rhnSQL/misc_functions.py | 174 +++++++++++++++++++-- .../unit-test/rhnSQL/test_server_registration.py | 117 +++++++++----- 2 files changed, 235 insertions(+), 56 deletions(-)
diff --git a/backend/server/test/unit-test/rhnSQL/misc_functions.py b/backend/server/test/unit-test/rhnSQL/misc_functions.py index d816907..efd9afb 100644 --- a/backend/server/test/unit-test/rhnSQL/misc_functions.py +++ b/backend/server/test/unit-test/rhnSQL/misc_functions.py @@ -229,8 +229,25 @@ class InvalidRoleError(Exception): pass +def get_server_arch_id(architecture): + lookup = """ + SELECT id + FROM rhnServerArch + WHERE label = :architecture + """ + h = rhnSQL.prepare(lookup) + h.execute( + architecture = architecture + ) + row = h.fetchone_dict() + + if row: + return row['id'] + else: + return None + def create_activation_key(org_id=None, user_id=None, groups=None, - channels=None, entitlement_level=None, note=None, server_id=None): + channels=None, entitlement_level=None, note=None, server_id=None, release=None): if org_id is None: need_user = 1 org_id = create_new_org() @@ -255,9 +272,25 @@ def create_activation_key(org_id=None, user_id=None, groups=None, if channels is None: channels = ['rhel-i386-as-3-beta', 'rhel-i386-as-2.1-beta'] + channel_arch_id = find_or_create_channel_arch( + name = "channel - test", + label = "test" + ) + # ensure channels are created - for channel in channels: - add_channel(channel) + for channel_label in channels: + channel = add_channel( + label = channel_label, + org_id = org_id, + channel_arch_id = channel_arch_id + ) + populate_rhn_dist_channel_map( + channel_id = channel['id'], + channel_arch_id = channel_arch_id, + org_id = org_id, + release = release + ) + if entitlement_level is None: entitlement_level = 'provisioning_entitled' @@ -301,6 +334,45 @@ def db_settings(backend): return settings +def grant_entitlements(org_id, entitlement, quantity): + activate_system_entitlement = rhnSQL.Procedure( + "rhn_entitlements.activate_system_entitlement") + + activate_system_entitlement(org_id, entitlement, quantity) + +def grant_channel_family_entitlements(org_id, channel_family, quantity): + """ + Check to see if org has a channelfamily associated with it. + If not, Create one. + """ + _lookup_chfam = """ + SELECT 1 from rhnChannelFamily + WHERE label='%s' + """ % channel_family + h = rhnSQL.prepare(_lookup_chfam) + row = h.execute() + # some extra check for upgrades + if row: + # Already exists, move on + return + _query_create_chfam = """ + INSERT INTO rhnChannelFamily + (id, name, label, org_id, product_url) + VALUES (sequence_nextval('rhn_channel_family_id_seq'), :name, :label, :org, :url) + + """ + h = rhnSQL.prepare(_query_create_chfam) + try: + h.execute( + name = 'Private Channel Family %s' % channel_family, + label = channel_family, + org = org_id, + url = '%s url' % channel_family + ) + except rhnSQL.SQLError, e: + # if we're here that means we're voilating something + raise + def find_or_create_arch_type(name, label): lookup = """ SELECT id from rhnArchType @@ -364,41 +436,109 @@ def find_or_create_channel_arch(name, label): return find_or_create_channel_arch(name, label) -def add_channel(label): +def add_channel(label, org_id, channel_arch_id): lookup = """ - SELECT 1 from rhnChannel - WHERE label='%s' - """ % label + SELECT * from rhnChannel + WHERE label = :label + """ + h = rhnSQL.prepare(lookup) - h.execute() + h.execute(label = label) row = h.fetchone_dict() + if row: - return + if row['org_id'] != org_id or row['channel_arch_id'] != channel_arch_id: + delete = "DELETE FROM rhnChannel WHERE id = :id" + h = rhnSQL.prepare(delete) + h.execute(id = row['id']) + rhnSQL.commit() + else: + return row query_create = """ INSERT INTO rhnChannel - (id, label, name, channel_arch_id, basedir, summary) - VALUES (sequence_nextval('rhn_channel_id_seq'), :label, :name, :channel_arch_id, :basedir, :summary) + (id, label, org_id, name, channel_arch_id, basedir, summary) + VALUES (sequence_nextval('rhn_channel_id_seq'), :label, :org_id, :name, :channel_arch_id, :basedir, :summary) """ - channel_arch_id = find_or_create_channel_arch( - name = "fake_itanium", - label = "Fake itanium" - ) - h = rhnSQL.prepare(query_create) try: - h.execute( label = label, + org_id = org_id, name = "Name for label %s" % label, channel_arch_id = channel_arch_id, basedir = 'basedir', summary = 'summary' ) rhnSQL.commit() + + h = rhnSQL.prepare(lookup) + h.execute(label = label) + return h.fetchone_dict() except rhnSQL.SQLError, e: # if we're here that means we're voilating something raise +def populate_rhn_dist_channel_map(channel_id, channel_arch_id, org_id, release): + if not release: + release = 'unit test' + + lookup = """ + SELECT 1 FROM rhnDistChannelMap + WHERE release = :release AND + channel_arch_id = :channel_arch_id AND + org_id = :org_id + """ + + h = rhnSQL.prepare(lookup) + h.execute( + release = release, + channel_arch_id = channel_arch_id, + org_id = org_id + ) + if h.fetchone_dict(): + return + + query_create = """ + INSERT INTO rhnDistChannelMap + (os, release, channel_arch_id, channel_id, org_id) + VALUES (:os, :release, :channel_arch_id, :channel_id, :org_id) + """ + + h = rhnSQL.prepare(query_create) + h.execute( + os = "TestOS", + release = release, + channel_arch_id = channel_arch_id, + channel_id = channel_id, + org_id = org_id + ) + rhnSQL.commit() + +def add_channel_to_server(channel_id, server_id): + lookup = """ + SELECT 1 FROM rhnServerChannel + WHERE server_id = :server_id AND + channel_id = :channel_id + """ + + h = rhnSQL.prepare(lookup) + row = h.execute(server_id = server_id, channel_id = channel_id) + if row: + return + + query_create = """ + INSERT INTO rhnServerChannel + (server_id, channel_id) + VALUES (:server_id, :channel_id) + """ + + h = rhnSQL.prepare(query_create) + h.execute( + channel_id = channel_id, + server_id = server_id + ) + rhnSQL.commit() + diff --git a/backend/server/test/unit-test/rhnSQL/test_server_registration.py b/backend/server/test/unit-test/rhnSQL/test_server_registration.py index 169b9c2..427e1f1 100644 --- a/backend/server/test/unit-test/rhnSQL/test_server_registration.py +++ b/backend/server/test/unit-test/rhnSQL/test_server_registration.py @@ -18,17 +18,16 @@ # import sys +import time import unittest from spacewalk.common import rhnFlags -from spacewalk.common.rhnConfig import initCFG -from spacewalk.server import rhnSQL, rhnServer, rhnChannel -from spacewalk.server.xmlrpc import registration +from spacewalk.common.rhnConfig import initCFG, CFG +from spacewalk.server import rhnSQL, rhnServer, rhnChannel, rhnUser +from spacewalk.server.handlers.xmlrpc import registration import misc_functions DB_SETTINGS = misc_functions.db_settings("oracle") - - class Tests(unittest.TestCase): _channel = 'redhat-advanced-server-i386' _channel_family = 'rhel-as' @@ -44,6 +43,7 @@ class Tests(unittest.TestCase): password = DB_SETTINGS["password"], database = DB_SETTINGS["database"] ) + rhnSQL.clear_log_id() def tearDown(self): # Roll back any unsaved data @@ -51,11 +51,21 @@ class Tests(unittest.TestCase): def test_new_server_1(self): "Test normal server registration, with username/password" - u = self._create_new_user() - username = u.contact['login'] - password = u.contact['password'] + u, password = self._create_new_user() + username = u.contact['login'] + org_id = u.contact['org_id'] + entitlements = self._entitlements + os_release = "2.1as" + + t = misc_functions.create_activation_key( + org_id = u.contact['org_id'], + entitlement_level = entitlements, + user_id = u.getid(), + release = os_release + ) + params = build_new_system_params_with_username(username=username, - password=password, os_release="2.1AS") + password=password, os_release=os_release) system_id = register_new_system(params) rhnSQL.commit() @@ -69,16 +79,25 @@ class Tests(unittest.TestCase): self.assertEqual(channels[0]['label'], self._channel) def test_new_server_token_1(self): - "Test registration with token" - u = self._create_new_user() - org_id = u.contact['org_id'] + "test registration with token" + u, _ = self._create_new_user() + org_id = u.contact['org_id'] entitlements = self._entitlements - t = misc_functions.create_activation_key(org_id=u.contact['org_id'], - entitlement_level=entitlements, user_id=u.getid()) + os_release = "2.1as" + + t = misc_functions.create_activation_key( + org_id = u.contact['org_id'], + entitlement_level = entitlements, + user_id = u.getid(), + release = os_release + ) token = t.get_token() - params = build_new_system_params_with_token(token=token) + params = build_new_system_params_with_token( + token=token, + os_release=os_release + ) system_id = register_new_system(params) rhnSQL.commit() @@ -88,18 +107,27 @@ class Tests(unittest.TestCase): def test_new_server_token_2(self): "Test registration with token that specifies a base channel" - u = self._create_new_user() - org_id = u.contact['org_id'] + + # FIXME: the test fails because there's no channel associated with the + # freshly created Server: rhnServerChannel is not populated by the + # registration code. + + u, _ = self._create_new_user() + org_id = u.contact['org_id'] base_channel = 'rhel-i386-as-3' entitlements = self._entitlements + os_release = "2.1as" + t = misc_functions.create_activation_key(org_id=u.contact['org_id'], entitlement_level=entitlements, user_id=u.getid(), - channels=[base_channel]) + channels=[base_channel], release=os_release) token = t.get_token() - params = build_new_system_params_with_token(token=token, - os_release="2.1AS") + params = build_new_system_params_with_token( + token=token, + os_release=os_release + ) system_id = register_new_system(params) rhnSQL.commit() @@ -114,9 +142,10 @@ class Tests(unittest.TestCase): def test_new_server_reactivation_token_1(self): "Test server re-registration" - u = self._create_new_user() - username = u.contact['login'] - password = u.contact['password'] + u, password = self._create_new_user() + username = u.contact['login'] + os_release = "2.1AS" + params = build_new_system_params_with_username(username=username, password=password, os_release="2.1AS") @@ -134,12 +163,14 @@ class Tests(unittest.TestCase): entitlements = self._entitlements t = misc_functions.create_activation_key(org_id=u.contact['org_id'], entitlement_level=entitlements, user_id=u.getid(), - channels=[base_channel], server_id=server_id_1) + channels=[base_channel], server_id=server_id_1, release=os_release) token = t.get_token() - params = build_new_system_params_with_token(token=token, - os_release="2.1AS") + params = build_new_system_params_with_token( + token=token, + os_release=os_release + ) system_id = register_new_system(params) rhnSQL.commit() @@ -158,17 +189,19 @@ class Tests(unittest.TestCase): Resulting server group is the union of all server groups from all tokens """ - u = self._create_new_user() - org_id = u.contact['org_id'] + u, _ = self._create_new_user() + org_id = u.contact['org_id'] entitlements = self._entitlements + os_release = "2.1AS" + t = misc_functions.create_activation_key(org_id=u.contact['org_id'], - entitlement_level=entitlements, user_id=u.getid()) + entitlement_level=entitlements, user_id=u.getid(), release=os_release) token1 = t.get_token() sg1 = t.get_server_groups() t = misc_functions.create_activation_key(org_id=u.contact['org_id'], - entitlement_level=entitlements, user_id=u.getid()) + entitlement_level=entitlements, user_id=u.getid(), release=os_release) token2 = t.get_token() sg2 = t.get_server_groups() @@ -176,7 +209,7 @@ class Tests(unittest.TestCase): token = token1 + ',' + token2 params = build_new_system_params_with_token(token=token, - os_release="2.1AS") + os_release=os_release) system_id = register_new_system(params) rhnSQL.commit() @@ -195,18 +228,25 @@ class Tests(unittest.TestCase): def _create_new_user(self): # Create new org org_id = misc_functions.create_new_org() + users_unencrypted_password = "unittest-password-%.3f" % time.time() # Grant entitlements to the org misc_functions.grant_entitlements(org_id, 'enterprise_entitled', 1) - misc_functions.grant_channel_family_entitlements(org_id, - self._channel_family, 1) + misc_functions.grant_channel_family_entitlements( + org_id, + "%s-%.3f" % (self._channel_family, time.time()), + 1 + ) # Create new user - u = misc_functions.create_new_user(org_id=org_id, roles=['org_admin']) - username = u.contact['login'] - # XXX This will break on satellites where passwords are encrypted - password = u.contact['password'] - return u + u = misc_functions.create_new_user( + org_id = org_id, + roles = ['org_admin'], + password = users_unencrypted_password, + encrypt_password = CFG.encrypted_passwords + ) + + return u, users_unencrypted_password class Counter: _counter = 0 @@ -216,7 +256,6 @@ class Counter: return val def build_new_system_params_with_username(**kwargs): - import time val = Counter().value() rnd_string = "%d-%d" % (int(time.time()), val) -- 1.8.4 _______________________________________________ Spacewalk-devel mailing list Spacewalk-devel@redhat.com https://www.redhat.com/mailman/listinfo/spacewalk-devel