Hi Dave, Please find the attached patch for Cast node. (Changes required due to drop objects functionality).
To run test cases please enter following command: python regression/runtests.py --pkg browser.server_groups.servers.databases.casts Let me know if any changes required. -- Best, Priyanka EnterpriseDB Corporation The Enterprise PostgreSQL Company
diff --git a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/__init__.py b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/__init__.py index bfb0263..b86b208 100644 --- a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/__init__.py +++ b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/__init__.py @@ -13,4 +13,4 @@ from pgadmin.utils.route import BaseTestGenerator class CastTestGenerator(BaseTestGenerator): def runTest(self): - return [] + return [] diff --git a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_add.py b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_add.py index 0dff330..bfa836e 100644 --- a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_add.py +++ b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_add.py @@ -7,12 +7,18 @@ # # ################################################################## +from __future__ import print_function from pgadmin.utils.route import BaseTestGenerator +from regression import parent_node_dict from regression import test_utils as utils from . import utils as cast_utils from pgadmin.browser.server_groups.servers.databases.tests import \ utils as database_utils -from pgadmin.browser.server_groups.servers.tests import utils as server_utils +import json +import os +import sys + +file_name = os.path.basename(__file__) class CastsAddTestCase(BaseTestGenerator): @@ -21,45 +27,48 @@ class CastsAddTestCase(BaseTestGenerator): ('Check Cast Node', dict(url='/browser/cast/obj/')) ] - @classmethod - def setUpClass(cls): - """ - This function perform the following tasks: - 1. Add and connect to the test server(s) - 2. Add database(s) connected to server(s) - - :return: None - """ - - # Add the server - server_utils.add_server(cls.tester) - - # Connect to server - cls.server_connect_response, cls.server_group, cls.server_ids = \ - server_utils.connect_server(cls.tester) - - if len(cls.server_connect_response) == 0: - raise Exception("No Server(s) connected to add the database!!!") - - # Add database - database_utils.add_database(cls.tester, cls.server_connect_response, - cls.server_ids) - def runTest(self): - """ This function will add cast under database node. """ + """ This function will add cast under test database. """ + + self.server_data = parent_node_dict["database"][0] + self.server_id = self.server_data["server_id"] + self.db_id = self.server_data['db_id'] + db_con = database_utils.connect_database(self, + utils.SERVER_GROUP, + self.server_id, + self.db_id) + if not db_con["info"] == "Database connected.": + raise Exception("Could not connect to database.") + try: - cast_utils.add_cast(self.tester) + self.data = cast_utils.get_cast_data() - @classmethod - def tearDownClass(cls): - """ - This function deletes the added cast, database, server and the - 'parent_id.pkl' file which is created in setUpClass. + response = self.tester.post( + self.url + str(utils.SERVER_GROUP) + '/' + + str(self.server_id) + '/' + str( + self.db_id) + '/', + data=json.dumps(self.data), + content_type='html/json') + self.assertEquals(response.status_code, 200) + response_data = json.loads(response.data.decode('utf-8')) + cast_id = response_data['node']['_id'] + cast_dict = {"cast_id": cast_id} + utils.write_node_info(int(self.server_id), "cid", cast_dict) + except Exception as exception: + exception = "Exception: %s: line:%s %s" % ( + file_name, sys.exc_traceback.tb_lineno, exception) + print(exception, file=sys.stderr) + raise Exception(exception) - :return: None - """ + def tearDown(self): + """This function disconnect the test database and drop added cast.""" - cast_utils.delete_cast(cls.tester) - database_utils.delete_database(cls.tester) - server_utils.delete_server(cls.tester) - utils.delete_parent_id_file() + connection = utils.get_db_connection(self.server_data['db_name'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port']) + cast_utils.drop_cast(connection, self.data["srctyp"], + self.data["trgtyp"]) + database_utils.disconnect_database(self, self.server_id, + self.db_id) diff --git a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_delete.py b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_delete.py index f3a92c9..b6ab0b9 100644 --- a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_delete.py +++ b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_delete.py @@ -1,102 +1,86 @@ -# # ################################################################# -# # -# # pgAdmin 4 - PostgreSQL Tools -# # -# # Copyright (C) 2013 - 2016, The pgAdmin Development Team -# # This software is released under the PostgreSQL Licence -# # -# # ################################################################## +# ################################################################# +# +# pgAdmin 4 - PostgreSQL Tools +# +# Copyright (C) 2013 - 2016, The pgAdmin Development Team +# This software is released under the PostgreSQL Licence +# +# ################################################################## +from __future__ import print_function from pgadmin.utils.route import BaseTestGenerator from regression import test_utils as utils -from regression.test_utils import get_ids +from regression import parent_node_dict from . import utils as cast_utils -from pgadmin.browser.server_groups.servers.tests import utils as server_utils from pgadmin.browser.server_groups.servers.databases.tests import \ utils as database_utils -from regression.test_setup import advanced_config_data -import json +import os +import sys + +file_name = os.path.basename(__file__) class CastsDeleteTestCase(BaseTestGenerator): - """ This class will fetch the cast node added under database node. """ + """ This class will delete the cast node added under database node. """ scenarios = [ # Fetching default URL for cast node. ('Check Cast Node', dict(url='/browser/cast/obj/')) ] - @classmethod - def setUpClass(cls): - """ - This function perform the following tasks: - 1. Add and connect to the test server(s) - 2. Add database(s) connected to server(s) - 3. Add cast(s) to databases - - :return: None - """ - - # Add the server - server_utils.add_server(cls.tester) + def setUp(self): - # Connect to server - cls.server_connect_response, cls.server_group, cls.server_ids = \ - server_utils.connect_server(cls.tester) + self.default_db = self.server["db"] + self.database_info = parent_node_dict['database'][0] + self.db_name = self.database_info['db_name'] + self.server["db"] = self.db_name + self.source_type = 'circle' + self.target_type = 'line' - if len(cls.server_connect_response) == 0: - raise Exception("No Server(s) connected to add the database!!!") - - # Add database - database_utils.add_database(cls.tester, cls.server_connect_response, - cls.server_ids) - - # Add cast(s) to database(s) - cast_utils.add_cast(cls.tester) + self.cast_id = cast_utils.create_cast(self.server, self.source_type, + self.target_type) def runTest(self): - """ This function will delete added cast(s).""" - - all_id = get_ids() - server_ids = all_id["sid"] - db_ids_dict = all_id["did"][0] - cast_ids_dict = all_id["cid"][0] - - for server_id in server_ids: - db_id = db_ids_dict[int(server_id)] - db_con = database_utils.verify_database(self.tester, - utils.SERVER_GROUP, - server_id, - db_id) - if len(db_con) == 0: - raise Exception("No database(s) to delete for server id %s" - % server_id) - cast_id = cast_ids_dict[server_id] - cast_get_data = cast_utils.verify_cast(self.tester, - utils.SERVER_GROUP, - server_id, - db_id, cast_id) - - if cast_get_data.status_code == 200: - + """ This function will delete added cast.""" + + self.server_id = self.database_info["server_id"] + self.db_id = self.database_info['db_id'] + + db_con = database_utils.connect_database(self, + utils.SERVER_GROUP, + self.server_id, + self.db_id) + if not db_con["info"] == "Database connected.": + raise Exception("Could not connect to database.") + try: + connection = utils.get_db_connection(self.server['db'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port']) + response = cast_utils.verify_cast(connection, self.source_type, + self.target_type) + + if len(response) == 0: + raise Exception("Could not find cast.") + else: delete_response = self.tester.delete( - self.url + str(utils.SERVER_GROUP) + '/' + - str(server_id) + '/' + str(db_id) + - '/' + str(cast_id), - follow_redirects=True) - response_data = json.loads(delete_response.data.decode('utf-8')) - self.assertTrue(response_data['success'], 1) - - @classmethod - def tearDownClass(cls): - """ - This function delete the added cast, database, server and the - 'parent_id.pkl' file which is created in setUpClass. - - :return: None - """ - - database_utils.delete_database(cls.tester) - server_utils.delete_server(cls.tester) - utils.delete_parent_id_file() + self.url + str(utils.SERVER_GROUP) + '/' + + str(self.server_id) + '/' + str(self.db_id) + + '/' + str(self.cast_id), + follow_redirects=True) + self.assertEquals(delete_response.status_code, 200) + + except Exception as exception: + exception = "Exception: %s: line:%s %s" % ( + file_name, sys.exc_traceback.tb_lineno, exception) + print(exception, file=sys.stderr) + raise Exception(exception) + + def tearDown(self): + """This function will disconnect test database.""" + + database_utils.disconnect_database(self, self.server_id, + self.db_id) + self.server['db'] = self.default_db diff --git a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_get.py b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_get.py index 20702e1..144c762 100644 --- a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_get.py +++ b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_get.py @@ -1,19 +1,23 @@ -# ################################################################# +################################################################## # # pgAdmin 4 - PostgreSQL Tools # # Copyright (C) 2013 - 2016, The pgAdmin Development Team # This software is released under the PostgreSQL Licence # -# ################################################################## +################################################################### +from __future__ import print_function from pgadmin.utils.route import BaseTestGenerator from regression import test_utils as utils -from regression.test_utils import get_ids +from regression import parent_node_dict from . import utils as cast_utils -from pgadmin.browser.server_groups.servers.tests import utils as server_utils from pgadmin.browser.server_groups.servers.databases.tests import \ utils as database_utils +import os +import sys + +file_name = os.path.basename(__file__) class CastsGetTestCase(BaseTestGenerator): @@ -24,64 +28,53 @@ class CastsGetTestCase(BaseTestGenerator): ('Check Cast Node', dict(url='/browser/cast/obj/')) ] - @classmethod - def setUpClass(cls): - """ - This function used to add the sever, database, and cast - - :return: None - """ - - # Add the server - server_utils.add_server(cls.tester) - - # Connect to server - cls.server_connect_response, cls.server_group, cls.server_ids = \ - server_utils.connect_server(cls.tester) - - if len(cls.server_connect_response) == 0: - raise Exception("No Server(s) connected to add the database!!!") + def setUp(self): + """ This function will create cast.""" - # Add database - database_utils.add_database(cls.tester, cls.server_connect_response, - cls.server_ids) + self.default_db = self.server["db"] + self.database_info = parent_node_dict['database'][0] + self.db_name = self.database_info['db_name'] + self.server["db"] = self.db_name + self.source_type = 'money' + self.target_type = 'bigint' - cast_utils.add_cast(cls.tester) + self.cast_id = cast_utils.create_cast(self.server, self.source_type, + self.target_type) def runTest(self): - """ This function will get added cast.""" - - all_id = get_ids() - server_ids = all_id["sid"] - db_ids_dict = all_id["did"][0] - cast_ids_dict = all_id["cid"][0] - - for server_id in server_ids: - db_id = db_ids_dict[int(server_id)] - db_con = database_utils.verify_database(self.tester, - utils.SERVER_GROUP, - server_id, - db_id) - if len(db_con) == 0: - raise Exception("No database(s) to delete for server id %s" - % server_id) - cast_id = cast_ids_dict[server_id] - cast_get_data = cast_utils.verify_cast(self.tester, - utils.SERVER_GROUP, - server_id, - db_id, cast_id) - self.assertEquals(cast_get_data.status_code, 200) - - @classmethod - def tearDownClass(cls): - """ - This function deletes the added cast, database, server and the - 'parent_id.pkl' file which is created in setup() function. - - :return: None - """ - - cast_utils.delete_cast(cls.tester) - database_utils.delete_database(cls.tester) - server_utils.delete_server(cls.tester) - utils.delete_parent_id_file() + """ This function will fetch added cast.""" + + self.server_id = self.database_info["server_id"] + self.db_id = self.database_info['db_id'] + db_con = database_utils.connect_database(self, + utils.SERVER_GROUP, + self.server_id, + self.db_id) + if not db_con["info"] == "Database connected.": + raise Exception("Could not connect to database.") + try: + response = self.tester.get( + self.url + str(utils.SERVER_GROUP) + '/' + str( + self.server_id) + '/' + + str(self.db_id) + '/' + str(self.cast_id), + content_type='html/json') + self.assertEquals(response.status_code, 200) + except Exception as exception: + exception = "Exception: %s: line:%s %s" % ( + file_name, sys.exc_traceback.tb_lineno, exception) + print(exception, file=sys.stderr) + raise Exception(exception) + + def tearDown(self): + """This function disconnect the test database and drop added cast.""" + + connection = utils.get_db_connection(self.server['db'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port']) + cast_utils.drop_cast(connection, self.source_type, + self.target_type) + database_utils.disconnect_database(self, self.server_id, + self.db_id) + self.server['db'] = self.default_db diff --git a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_put.py b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_put.py index ea67c77..b2c378e 100644 --- a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_put.py +++ b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_put.py @@ -7,15 +7,18 @@ # # ################################################################## +from __future__ import print_function from pgadmin.utils.route import BaseTestGenerator from regression import test_utils as utils -from regression.test_utils import get_ids +from regression import parent_node_dict from . import utils as cast_utils -from pgadmin.browser.server_groups.servers.tests import utils as server_utils from pgadmin.browser.server_groups.servers.databases.tests import \ utils as database_utils -from regression.test_setup import advanced_config_data import json +import os +import sys + +file_name = os.path.basename(__file__) class CastsPutTestCase(BaseTestGenerator): @@ -26,82 +29,73 @@ class CastsPutTestCase(BaseTestGenerator): ('Check Cast Node', dict(url='/browser/cast/obj/')) ] - @classmethod - def setUpClass(cls): - """ - This function perform the following tasks: - 1. Add and connect to the test server(s) - 2. Add database(s) connected to server(s) - 3. Add cast(s) to databases - - :return: None - """ - - # Add the server - server_utils.add_server(cls.tester) - # Connect to server - cls.server_connect_response, cls.server_group, cls.server_ids = \ - server_utils.connect_server(cls.tester) - - if len(cls.server_connect_response) == 0: - raise Exception("No Server(s) connected to add the database!!!") + def setUp(self): + """ This function will create cast.""" - # Add database - database_utils.add_database(cls.tester, cls.server_connect_response, - cls.server_ids) + self.default_db = self.server["db"] + self.database_info = parent_node_dict['database'][0] + self.db_name = self.database_info['db_name'] + self.server["db"] = self.db_name + self.source_type = 'character' + self.target_type = 'cidr' - cast_utils.add_cast(cls.tester) + self.cast_id = cast_utils.create_cast(self.server, self.source_type, + self.target_type) def runTest(self): """ This function will update added cast.""" - all_id = get_ids() - server_ids = all_id["sid"] - db_ids_dict = all_id["did"][0] - cast_ids_dict = all_id["cid"][0] - - for server_id in server_ids: - db_id = db_ids_dict[int(server_id)] - db_con = database_utils.verify_database(self.tester, - utils.SERVER_GROUP, - server_id, - db_id) - if len(db_con) == 0: - raise Exception("No database(s) to delete for server id %s" - % server_id) - cast_id = cast_ids_dict[server_id] - cast_get_data = cast_utils.verify_cast(self.tester, - utils.SERVER_GROUP, - server_id, - db_id, cast_id) - - if cast_get_data.status_code == 200: - + self.server_id = self.database_info["server_id"] + self.db_id = self.database_info['db_id'] + + db_con = database_utils.connect_database(self, + utils.SERVER_GROUP, + self.server_id, + self.db_id) + if not db_con["info"] == "Database connected.": + raise Exception("Could not connect to database.") + try: + connection = utils.get_db_connection(self.server['db'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port']) + response = cast_utils.verify_cast(connection, self.source_type, + self.target_type) + + if len(response) == 0: + raise Exception("Could not find cast.") + else: data = { - "description": advanced_config_data["cast_update_data"] - ["comment"], - "id": cast_id + "description": "This is cast update comment", + "id": self.cast_id } put_response = self.tester.put( self.url + str(utils.SERVER_GROUP) + '/' + - str(server_id) + '/' + str( - db_id) + - '/' + str(cast_id), + str(self.server_id) + '/' + str( + self.db_id) + + '/' + str(self.cast_id), data=json.dumps(data), follow_redirects=True) self.assertEquals(put_response.status_code, 200) - @classmethod - def tearDownClass(cls): - """ - This function deletes the added cast, database, server and the - 'parent_id.pkl' file which is created in setUpClass. - - :return: None - """ - - cast_utils.delete_cast(cls.tester) - database_utils.delete_database(cls.tester) - server_utils.delete_server(cls.tester) - utils.delete_parent_id_file() + except Exception as exception: + exception = "Exception: %s: line:%s %s" % ( + file_name, sys.exc_traceback.tb_lineno, exception) + print(exception, file=sys.stderr) + raise Exception(exception) + + def tearDown(self): + """This function disconnect the test database and drop added cast.""" + + connection = utils.get_db_connection(self.server['db'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port']) + cast_utils.drop_cast(connection, self.source_type, + self.target_type) + database_utils.disconnect_database(self, self.server_id, + self.db_id) + self.server['db'] = self.default_db diff --git a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/utils.py b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/utils.py index 0e89513..95becd2 100644 --- a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/utils.py +++ b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/utils.py @@ -7,139 +7,103 @@ # # ################################################################## +from __future__ import print_function import os -import pickle -import json -from regression.test_setup import advanced_config_data, pickle_path -from regression import test_utils as utils -from pgadmin.browser.server_groups.servers.databases.tests import \ - utils as database_utils -from pgadmin.browser.server_groups.servers.tests import utils as server_utils +import sys +from regression.test_utils import get_db_connection +file_name = os.path.basename(__file__) CAST_URL = '/browser/cast/obj/' -def get_cast_config_data(server_connect_data): - - adv_config_data = None - db_user = server_connect_data['data']['user']['name'] - - # Get the config data of appropriate db user - for config_test_data in advanced_config_data['casts_credentials']: - if db_user == config_test_data['owner']: - adv_config_data = config_test_data - +def get_cast_data(): data = { - "castcontext": adv_config_data - ['cast_context'], - "encoding": adv_config_data - ['encoding'], - "name": adv_config_data - ['name'], - "srctyp": adv_config_data - ['source_type'], - "trgtyp": adv_config_data - ['target_type'] - } + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money", + "trgtyp": "bigint", + } return data -def add_cast(tester): - """ - This function add the cast in the existing database - - :param tester: test object - :type tester: flask test object - :return:None - """ - - all_id = utils.get_ids() - server_ids = all_id["sid"] - db_ids_dict = all_id["did"][0] - server_group = utils.config_data['server_group'] - - for server_id in server_ids: - db_id = db_ids_dict[int(server_id)] - db_con = database_utils.verify_database(tester, server_group, - server_id, db_id) - if db_con['data']['connected']: - server_connect_response = server_utils.verify_server( - tester, server_group, server_id) - - data = get_cast_config_data(server_connect_response) - - response = tester.post(CAST_URL + str(server_group) + '/' + - str(server_id) + '/' + str( - db_id) + '/', - data=json.dumps(data), - content_type='html/json') - - assert response.status_code == 200 - response_data = json.loads(response.data.decode('utf-8')) - write_cast_info(response_data, server_id) - - -def write_cast_info(response_data, server_id): +def create_cast(server, source_type, target_type): """ - This function writes the server's details to file parent_id.pkl + This function used to create cast in the existing dummy database - :param response_data: server's data - :type response_data: list of dictionary - :param pickle_id_dict: contains ids of server,database,tables etc. - :type pickle_id_dict: dict - :return: None + :param server: test_server, cast_source_type, cast_target_type + :return: cast_id """ - cast_id = response_data['node']['_id'] - pickle_id_dict = utils.get_pickle_id_dict() - if os.path.isfile(pickle_path): - existing_server_id = open(pickle_path, 'rb') - tol_server_id = pickle.load(existing_server_id) - pickle_id_dict = tol_server_id - if 'cid' in pickle_id_dict: - if pickle_id_dict['cid']: - # Add the cast_id as value in dict - pickle_id_dict["cid"][0].update({server_id: cast_id}) - else: - # Create new dict with server_id and cast_id - pickle_id_dict["cid"].append({server_id: cast_id}) - cast_output = open(pickle_path, 'wb') - pickle.dump(pickle_id_dict, cast_output) - cast_output.close() - - -def verify_cast(tester, server_group, server_id, db_id, cast_id): - - cast_response = tester.get(CAST_URL + str(server_group) + '/' + - str(server_id) + '/' + str(db_id) + - '/' + str(cast_id), - content_type='html/json') - return cast_response - - -def delete_cast(tester): - all_id = utils.get_ids() - server_ids = all_id["sid"] - db_ids_dict = all_id["did"][0] - cast_ids_dict = all_id["cid"][0] - - for server_id in server_ids: - db_id = db_ids_dict[int(server_id)] - db_con = database_utils.verify_database(tester, utils.SERVER_GROUP, - server_id, - db_id) - if len(db_con) == 0: - raise Exception("No database(s) to delete for server id %s" - % server_id) - cast_id = cast_ids_dict[server_id] - cast_get_data = verify_cast(tester, utils.SERVER_GROUP, - server_id, - db_id, cast_id) - - if cast_get_data.status_code == 200: - delete_response = tester.delete( - CAST_URL + str(utils.SERVER_GROUP) + '/' + - str(server_id) + '/' + str(db_id) + - '/' + str(cast_id), - follow_redirects=True) - return delete_response + try: + connection = get_db_connection(server['db'], + server['username'], + server['db_password'], + server['host'], + server['port']) + old_isolation_level = connection.isolation_level + connection.set_isolation_level(0) + pg_cursor = connection.cursor() + pg_cursor.execute("CREATE CAST (%s AS %s) WITHOUT" + " FUNCTION AS IMPLICIT" % (source_type, target_type)) + + connection.set_isolation_level(old_isolation_level) + connection.commit() + + # Get 'oid' from newly created cast + pg_cursor.execute( + "SELECT ca.oid FROM pg_cast ca WHERE ca.castsource = " + "(SELECT t.oid FROM pg_type t WHERE format_type(t.oid, NULL)='%s') " + "AND ca.casttarget = (SELECT t.oid FROM pg_type t WHERE " + "format_type(t.oid, NULL) = '%s')" % (source_type, target_type)) + oid = pg_cursor.fetchone() + cast_id = '' + if oid: + cast_id = oid[0] + connection.close() + return cast_id + except Exception as exception: + exception = "Exception: %s: line:%s %s" % ( + file_name, sys.exc_traceback.tb_lineno, exception) + print(exception, file=sys.stderr) + + +def verify_cast(connection, source_type, target_type): + """ This function will verify current cast.""" + + try: + pg_cursor = connection.cursor() + + pg_cursor.execute( + "SELECT * FROM pg_cast ca WHERE ca.castsource = " + "(SELECT t.oid FROM pg_type t WHERE format_type(t.oid, NULL)='%s') " + "AND ca.casttarget = (SELECT t.oid FROM pg_type t WHERE " + "format_type(t.oid, NULL) = '%s')" % (source_type, target_type)) + casts = pg_cursor.fetchall() + connection.close() + return casts + except Exception as exception: + exception = "%s: line:%s %s" % ( + file_name, sys.exc_traceback.tb_lineno, exception) + print(exception, file=sys.stderr) + + +def drop_cast(connection, source_type, target_type): + """This function used to drop the cast""" + + try: + pg_cursor = connection.cursor() + pg_cursor.execute( + "SELECT * FROM pg_cast ca WHERE ca.castsource = " + "(SELECT t.oid FROM pg_type t WHERE format_type(t.oid, NULL)='%s') " + "AND ca.casttarget = (SELECT t.oid FROM pg_type t WHERE " + "format_type(t.oid, NULL) = '%s')" % (source_type, target_type)) + if pg_cursor.fetchall(): + pg_cursor.execute( + "DROP CAST (%s AS %s) CASCADE" % (source_type, target_type)) + connection.commit() + connection.close() + except Exception as exception: + exception = "%s: line:%s %s" % ( + file_name, sys.exc_traceback.tb_lineno, exception) + print(exception, file=sys.stderr) diff --git a/web/pgadmin/browser/server_groups/servers/databases/tests/utils.py b/web/pgadmin/browser/server_groups/servers/databases/tests/utils.py index 6b26863..c1ef3a7 100644 --- a/web/pgadmin/browser/server_groups/servers/databases/tests/utils.py +++ b/web/pgadmin/browser/server_groups/servers/databases/tests/utils.py @@ -115,7 +115,7 @@ def create_database(connection, db_name): raise Exception("Error while creating database. %s" % exception) -def verify_database(self, server_group, server_id, db_id): +def connect_database(self, server_group, server_id, db_id): """ This function verifies that database is exists and whether it connect successfully or not diff --git a/web/pgadmin/utils/route.py b/web/pgadmin/utils/route.py index 74269de..be75478 100644 --- a/web/pgadmin/utils/route.py +++ b/web/pgadmin/utils/route.py @@ -7,15 +7,17 @@ # ############################################################## +from __future__ import print_function import sys import unittest - +import os from abc import ABCMeta, abstractmethod from importlib import import_module from werkzeug.utils import find_modules - import config +file_name = os.path.basename(__file__) + class TestsGeneratorRegistry(ABCMeta): """ @@ -69,8 +71,10 @@ class TestsGeneratorRegistry(ABCMeta): try: if "tests." in str(module_name): cls.import_app_modules(module_name) - except ImportError: - pass + except Exception as exception: + exception = "%s: line:%s %s" % ( + file_name, sys.exc_traceback.tb_lineno, exception) + print(exception, file=sys.stderr) else: for module_name in find_modules(pkg, False, True): try: @@ -78,8 +82,11 @@ class TestsGeneratorRegistry(ABCMeta): # is False if "pgadmin.browser.tests" not in module_name: cls.import_app_modules(module_name) - except ImportError: - pass + except Exception as exception: + exception = "%s: line:%s %s" % ( + file_name, sys.exc_traceback.tb_lineno, exception) + print(exception, file=sys.stderr) + import six diff --git a/web/regression/__init__.py b/web/regression/__init__.py index 6b7cb2f..937cbcf 100644 --- a/web/regression/__init__.py +++ b/web/regression/__init__.py @@ -28,10 +28,8 @@ node_info_dict = { "seid": [] # sequence } -global test_server_dict -test_server_dict = { +global parent_node_dict +parent_node_dict = { "server": [], - "database": [], - "tablespace": [], - "role": [] + "database": [] } diff --git a/web/regression/runtests.py b/web/regression/runtests.py index 87b6004..8be9fff 100644 --- a/web/regression/runtests.py +++ b/web/regression/runtests.py @@ -10,7 +10,6 @@ """ This file collect all modules/files present in tests directory and add them to TestSuite. """ from __future__ import print_function - import argparse import os import sys @@ -35,7 +34,7 @@ if sys.path[0] != root: from pgadmin import create_app import config -import test_setup +from regression import test_setup # Delete SQLite db file if exists if os.path.isfile(config.TEST_SQLITE_PATH): @@ -59,14 +58,14 @@ if pgadmin_credentials: 'login_password'] # Execute the setup file -exec (open("setup.py").read()) +exec(open("setup.py").read()) # Get the config database schema version. We store this in pgadmin.model # as it turns out that putting it in the config files isn't a great idea from pgadmin.model import SCHEMA_VERSION # Delay the import test_utils as it needs updated config.SQLITE_PATH -import test_utils +from regression import test_utils config.SETTINGS_SCHEMA_VERSION = SCHEMA_VERSION diff --git a/web/regression/test_utils.py b/web/regression/test_utils.py index 731f1b6..a5cc10c 100644 --- a/web/regression/test_utils.py +++ b/web/regression/test_utils.py @@ -40,7 +40,7 @@ def login_tester_account(tester): :type tester: flask test client object :return: None """ - if os.environ['PGADMIN_SETUP_EMAIL'] and\ + if os.environ['PGADMIN_SETUP_EMAIL'] and \ os.environ['PGADMIN_SETUP_PASSWORD']: email = os.environ['PGADMIN_SETUP_EMAIL'] password = os.environ['PGADMIN_SETUP_PASSWORD'] @@ -145,22 +145,19 @@ def create_database(server, db_name): def drop_database(connection, database_name): """This function used to drop the database""" - try: - pg_cursor = connection.cursor() - pg_cursor.execute("SELECT * FROM pg_database db WHERE db.datname='%s'" - % database_name) - if pg_cursor.fetchall(): - # Release pid if any process using database - pg_cursor.execute("select pg_terminate_backend(pid) from" - " pg_stat_activity where datname='%s'" % - database_name) - old_isolation_level = connection.isolation_level - connection.set_isolation_level(0) - pg_cursor.execute('''DROP DATABASE "%s"''' % database_name) - connection.set_isolation_level(old_isolation_level) - connection.commit() - connection.close() + if database_name not in ["postgres", "template1", "template0"]: + pg_cursor = connection.cursor() + pg_cursor.execute( + "SELECT * FROM pg_database db WHERE db.datname='%s'" + % database_name) + if pg_cursor.fetchall(): + old_isolation_level = connection.isolation_level + connection.set_isolation_level(0) + pg_cursor.execute('''DROP DATABASE "%s"''' % database_name) + connection.set_isolation_level(old_isolation_level) + connection.commit() + connection.close() except Exception as exception: exception = "%s: line:%s %s" % ( file_name, sys.exc_traceback.tb_lineno, exception) @@ -235,18 +232,17 @@ def create_test_server(server_info): db_id = create_database(server_info, test_db_name) # Add server info to test_server_dict - regression.test_server_dict["server"].append({"server_id": srv_id, + regression.parent_node_dict["server"].append({"server_id": srv_id, "server": server_info}) - regression.test_server_dict["database"].append({"server_id": srv_id, + regression.parent_node_dict["database"].append({"server_id": srv_id, "db_id": db_id, "db_name": test_db_name}) def delete_test_server(tester): - test_server_dict = regression.test_server_dict + test_server_dict = regression.parent_node_dict test_servers = test_server_dict["server"] test_databases = test_server_dict["database"] - test_table_spaces = test_server_dict["tablespace"] try: for test_server in test_servers: srv_id = test_server["server_id"] @@ -268,8 +264,8 @@ def delete_test_server(tester): print(exception, file=sys.stderr) # Clear test_server_dict - for item in regression.test_server_dict: - del regression.test_server_dict[item][:] + for item in regression.parent_node_dict: + del regression.parent_node_dict[item][:] def remove_db_file(): @@ -281,6 +277,7 @@ def remove_db_file(): def _drop_objects(tester): """This function use to cleanup the created the objects(servers, databases, schemas etc) during the test suite run""" + try: conn = sqlite3.connect(config.SQLITE_PATH) cur = conn.cursor() @@ -319,9 +316,7 @@ def _drop_objects(tester): server_info[1], server_info[2]) # Do not drop the default databases - if db[0] not in ["postgres", "template1", - "template0"]: - drop_database(connection, db[0]) + drop_database(connection, db[0]) # Delete tablespace connection = get_db_connection(server_info[3], @@ -373,7 +368,7 @@ def _drop_objects(tester): delete_server(tester, server_id) except Exception as exception: exception = "Exception while deleting server: %s:" \ - " line:%s %s" %\ + " line:%s %s" % \ (file_name, sys.exc_traceback.tb_lineno, exception) print(exception, file=sys.stderr)
-- Sent via pgadmin-hackers mailing list (pgadmin-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgadmin-hackers