Hi Dave,
Please find the attached patch for Cast node. (Changes required due to
drop objects functionality).
To run test cases please enter following command:
python regression/runtests.py --pkg
browser.server_groups.servers.databases.casts
Let me know if any changes required.
--
Best,
Priyanka
EnterpriseDB Corporation
The Enterprise PostgreSQL Company
diff --git a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/__init__.py b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/__init__.py
index bfb0263..b86b208 100644
--- a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/__init__.py
+++ b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/__init__.py
@@ -13,4 +13,4 @@ from pgadmin.utils.route import BaseTestGenerator
class CastTestGenerator(BaseTestGenerator):
def runTest(self):
- return []
+ return []
diff --git a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_add.py b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_add.py
index 0dff330..bfa836e 100644
--- a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_add.py
+++ b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_add.py
@@ -7,12 +7,18 @@
#
# ##################################################################
+from __future__ import print_function
from pgadmin.utils.route import BaseTestGenerator
+from regression import parent_node_dict
from regression import test_utils as utils
from . import utils as cast_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
-from pgadmin.browser.server_groups.servers.tests import utils as server_utils
+import json
+import os
+import sys
+
+file_name = os.path.basename(__file__)
class CastsAddTestCase(BaseTestGenerator):
@@ -21,45 +27,48 @@ class CastsAddTestCase(BaseTestGenerator):
('Check Cast Node', dict(url='/browser/cast/obj/'))
]
- @classmethod
- def setUpClass(cls):
- """
- This function perform the following tasks:
- 1. Add and connect to the test server(s)
- 2. Add database(s) connected to server(s)
-
- :return: None
- """
-
- # Add the server
- server_utils.add_server(cls.tester)
-
- # Connect to server
- cls.server_connect_response, cls.server_group, cls.server_ids = \
- server_utils.connect_server(cls.tester)
-
- if len(cls.server_connect_response) == 0:
- raise Exception("No Server(s) connected to add the database!!!")
-
- # Add database
- database_utils.add_database(cls.tester, cls.server_connect_response,
- cls.server_ids)
-
def runTest(self):
- """ This function will add cast under database node. """
+ """ This function will add cast under test database. """
+
+ self.server_data = parent_node_dict["database"][0]
+ self.server_id = self.server_data["server_id"]
+ self.db_id = self.server_data['db_id']
+ db_con = database_utils.connect_database(self,
+ utils.SERVER_GROUP,
+ self.server_id,
+ self.db_id)
+ if not db_con["info"] == "Database connected.":
+ raise Exception("Could not connect to database.")
+ try:
- cast_utils.add_cast(self.tester)
+ self.data = cast_utils.get_cast_data()
- @classmethod
- def tearDownClass(cls):
- """
- This function deletes the added cast, database, server and the
- 'parent_id.pkl' file which is created in setUpClass.
+ response = self.tester.post(
+ self.url + str(utils.SERVER_GROUP) + '/' +
+ str(self.server_id) + '/' + str(
+ self.db_id) + '/',
+ data=json.dumps(self.data),
+ content_type='html/json')
+ self.assertEquals(response.status_code, 200)
+ response_data = json.loads(response.data.decode('utf-8'))
+ cast_id = response_data['node']['_id']
+ cast_dict = {"cast_id": cast_id}
+ utils.write_node_info(int(self.server_id), "cid", cast_dict)
+ except Exception as exception:
+ exception = "Exception: %s: line:%s %s" % (
+ file_name, sys.exc_traceback.tb_lineno, exception)
+ print(exception, file=sys.stderr)
+ raise Exception(exception)
- :return: None
- """
+ def tearDown(self):
+ """This function disconnect the test database and drop added cast."""
- cast_utils.delete_cast(cls.tester)
- database_utils.delete_database(cls.tester)
- server_utils.delete_server(cls.tester)
- utils.delete_parent_id_file()
+ connection = utils.get_db_connection(self.server_data['db_name'],
+ self.server['username'],
+ self.server['db_password'],
+ self.server['host'],
+ self.server['port'])
+ cast_utils.drop_cast(connection, self.data["srctyp"],
+ self.data["trgtyp"])
+ database_utils.disconnect_database(self, self.server_id,
+ self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_delete.py b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_delete.py
index f3a92c9..b6ab0b9 100644
--- a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_delete.py
+++ b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_delete.py
@@ -1,102 +1,86 @@
-# # #################################################################
-# #
-# # pgAdmin 4 - PostgreSQL Tools
-# #
-# # Copyright (C) 2013 - 2016, The pgAdmin Development Team
-# # This software is released under the PostgreSQL Licence
-# #
-# # ##################################################################
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+from __future__ import print_function
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
-from regression.test_utils import get_ids
+from regression import parent_node_dict
from . import utils as cast_utils
-from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
-from regression.test_setup import advanced_config_data
-import json
+import os
+import sys
+
+file_name = os.path.basename(__file__)
class CastsDeleteTestCase(BaseTestGenerator):
- """ This class will fetch the cast node added under database node. """
+ """ This class will delete the cast node added under database node. """
scenarios = [
# Fetching default URL for cast node.
('Check Cast Node', dict(url='/browser/cast/obj/'))
]
- @classmethod
- def setUpClass(cls):
- """
- This function perform the following tasks:
- 1. Add and connect to the test server(s)
- 2. Add database(s) connected to server(s)
- 3. Add cast(s) to databases
-
- :return: None
- """
-
- # Add the server
- server_utils.add_server(cls.tester)
+ def setUp(self):
- # Connect to server
- cls.server_connect_response, cls.server_group, cls.server_ids = \
- server_utils.connect_server(cls.tester)
+ self.default_db = self.server["db"]
+ self.database_info = parent_node_dict['database'][0]
+ self.db_name = self.database_info['db_name']
+ self.server["db"] = self.db_name
+ self.source_type = 'circle'
+ self.target_type = 'line'
- if len(cls.server_connect_response) == 0:
- raise Exception("No Server(s) connected to add the database!!!")
-
- # Add database
- database_utils.add_database(cls.tester, cls.server_connect_response,
- cls.server_ids)
-
- # Add cast(s) to database(s)
- cast_utils.add_cast(cls.tester)
+ self.cast_id = cast_utils.create_cast(self.server, self.source_type,
+ self.target_type)
def runTest(self):
- """ This function will delete added cast(s)."""
-
- all_id = get_ids()
- server_ids = all_id["sid"]
- db_ids_dict = all_id["did"][0]
- cast_ids_dict = all_id["cid"][0]
-
- for server_id in server_ids:
- db_id = db_ids_dict[int(server_id)]
- db_con = database_utils.verify_database(self.tester,
- utils.SERVER_GROUP,
- server_id,
- db_id)
- if len(db_con) == 0:
- raise Exception("No database(s) to delete for server id %s"
- % server_id)
- cast_id = cast_ids_dict[server_id]
- cast_get_data = cast_utils.verify_cast(self.tester,
- utils.SERVER_GROUP,
- server_id,
- db_id, cast_id)
-
- if cast_get_data.status_code == 200:
-
+ """ This function will delete added cast."""
+
+ self.server_id = self.database_info["server_id"]
+ self.db_id = self.database_info['db_id']
+
+ db_con = database_utils.connect_database(self,
+ utils.SERVER_GROUP,
+ self.server_id,
+ self.db_id)
+ if not db_con["info"] == "Database connected.":
+ raise Exception("Could not connect to database.")
+ try:
+ connection = utils.get_db_connection(self.server['db'],
+ self.server['username'],
+ self.server['db_password'],
+ self.server['host'],
+ self.server['port'])
+ response = cast_utils.verify_cast(connection, self.source_type,
+ self.target_type)
+
+ if len(response) == 0:
+ raise Exception("Could not find cast.")
+ else:
delete_response = self.tester.delete(
- self.url + str(utils.SERVER_GROUP) + '/' +
- str(server_id) + '/' + str(db_id) +
- '/' + str(cast_id),
- follow_redirects=True)
- response_data = json.loads(delete_response.data.decode('utf-8'))
- self.assertTrue(response_data['success'], 1)
-
- @classmethod
- def tearDownClass(cls):
- """
- This function delete the added cast, database, server and the
- 'parent_id.pkl' file which is created in setUpClass.
-
- :return: None
- """
-
- database_utils.delete_database(cls.tester)
- server_utils.delete_server(cls.tester)
- utils.delete_parent_id_file()
+ self.url + str(utils.SERVER_GROUP) + '/' +
+ str(self.server_id) + '/' + str(self.db_id) +
+ '/' + str(self.cast_id),
+ follow_redirects=True)
+ self.assertEquals(delete_response.status_code, 200)
+
+ except Exception as exception:
+ exception = "Exception: %s: line:%s %s" % (
+ file_name, sys.exc_traceback.tb_lineno, exception)
+ print(exception, file=sys.stderr)
+ raise Exception(exception)
+
+ def tearDown(self):
+ """This function will disconnect test database."""
+
+ database_utils.disconnect_database(self, self.server_id,
+ self.db_id)
+ self.server['db'] = self.default_db
diff --git a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_get.py b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_get.py
index 20702e1..144c762 100644
--- a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_get.py
+++ b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_get.py
@@ -1,19 +1,23 @@
-# #################################################################
+##################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2016, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
-# ##################################################################
+###################################################################
+from __future__ import print_function
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
-from regression.test_utils import get_ids
+from regression import parent_node_dict
from . import utils as cast_utils
-from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
+import os
+import sys
+
+file_name = os.path.basename(__file__)
class CastsGetTestCase(BaseTestGenerator):
@@ -24,64 +28,53 @@ class CastsGetTestCase(BaseTestGenerator):
('Check Cast Node', dict(url='/browser/cast/obj/'))
]
- @classmethod
- def setUpClass(cls):
- """
- This function used to add the sever, database, and cast
-
- :return: None
- """
-
- # Add the server
- server_utils.add_server(cls.tester)
-
- # Connect to server
- cls.server_connect_response, cls.server_group, cls.server_ids = \
- server_utils.connect_server(cls.tester)
-
- if len(cls.server_connect_response) == 0:
- raise Exception("No Server(s) connected to add the database!!!")
+ def setUp(self):
+ """ This function will create cast."""
- # Add database
- database_utils.add_database(cls.tester, cls.server_connect_response,
- cls.server_ids)
+ self.default_db = self.server["db"]
+ self.database_info = parent_node_dict['database'][0]
+ self.db_name = self.database_info['db_name']
+ self.server["db"] = self.db_name
+ self.source_type = 'money'
+ self.target_type = 'bigint'
- cast_utils.add_cast(cls.tester)
+ self.cast_id = cast_utils.create_cast(self.server, self.source_type,
+ self.target_type)
def runTest(self):
- """ This function will get added cast."""
-
- all_id = get_ids()
- server_ids = all_id["sid"]
- db_ids_dict = all_id["did"][0]
- cast_ids_dict = all_id["cid"][0]
-
- for server_id in server_ids:
- db_id = db_ids_dict[int(server_id)]
- db_con = database_utils.verify_database(self.tester,
- utils.SERVER_GROUP,
- server_id,
- db_id)
- if len(db_con) == 0:
- raise Exception("No database(s) to delete for server id %s"
- % server_id)
- cast_id = cast_ids_dict[server_id]
- cast_get_data = cast_utils.verify_cast(self.tester,
- utils.SERVER_GROUP,
- server_id,
- db_id, cast_id)
- self.assertEquals(cast_get_data.status_code, 200)
-
- @classmethod
- def tearDownClass(cls):
- """
- This function deletes the added cast, database, server and the
- 'parent_id.pkl' file which is created in setup() function.
-
- :return: None
- """
-
- cast_utils.delete_cast(cls.tester)
- database_utils.delete_database(cls.tester)
- server_utils.delete_server(cls.tester)
- utils.delete_parent_id_file()
+ """ This function will fetch added cast."""
+
+ self.server_id = self.database_info["server_id"]
+ self.db_id = self.database_info['db_id']
+ db_con = database_utils.connect_database(self,
+ utils.SERVER_GROUP,
+ self.server_id,
+ self.db_id)
+ if not db_con["info"] == "Database connected.":
+ raise Exception("Could not connect to database.")
+ try:
+ response = self.tester.get(
+ self.url + str(utils.SERVER_GROUP) + '/' + str(
+ self.server_id) + '/' +
+ str(self.db_id) + '/' + str(self.cast_id),
+ content_type='html/json')
+ self.assertEquals(response.status_code, 200)
+ except Exception as exception:
+ exception = "Exception: %s: line:%s %s" % (
+ file_name, sys.exc_traceback.tb_lineno, exception)
+ print(exception, file=sys.stderr)
+ raise Exception(exception)
+
+ def tearDown(self):
+ """This function disconnect the test database and drop added cast."""
+
+ connection = utils.get_db_connection(self.server['db'],
+ self.server['username'],
+ self.server['db_password'],
+ self.server['host'],
+ self.server['port'])
+ cast_utils.drop_cast(connection, self.source_type,
+ self.target_type)
+ database_utils.disconnect_database(self, self.server_id,
+ self.db_id)
+ self.server['db'] = self.default_db
diff --git a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_put.py b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_put.py
index ea67c77..b2c378e 100644
--- a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_put.py
+++ b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_put.py
@@ -7,15 +7,18 @@
#
# ##################################################################
+from __future__ import print_function
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
-from regression.test_utils import get_ids
+from regression import parent_node_dict
from . import utils as cast_utils
-from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
-from regression.test_setup import advanced_config_data
import json
+import os
+import sys
+
+file_name = os.path.basename(__file__)
class CastsPutTestCase(BaseTestGenerator):
@@ -26,82 +29,73 @@ class CastsPutTestCase(BaseTestGenerator):
('Check Cast Node', dict(url='/browser/cast/obj/'))
]
- @classmethod
- def setUpClass(cls):
- """
- This function perform the following tasks:
- 1. Add and connect to the test server(s)
- 2. Add database(s) connected to server(s)
- 3. Add cast(s) to databases
-
- :return: None
- """
-
- # Add the server
- server_utils.add_server(cls.tester)
- # Connect to server
- cls.server_connect_response, cls.server_group, cls.server_ids = \
- server_utils.connect_server(cls.tester)
-
- if len(cls.server_connect_response) == 0:
- raise Exception("No Server(s) connected to add the database!!!")
+ def setUp(self):
+ """ This function will create cast."""
- # Add database
- database_utils.add_database(cls.tester, cls.server_connect_response,
- cls.server_ids)
+ self.default_db = self.server["db"]
+ self.database_info = parent_node_dict['database'][0]
+ self.db_name = self.database_info['db_name']
+ self.server["db"] = self.db_name
+ self.source_type = 'character'
+ self.target_type = 'cidr'
- cast_utils.add_cast(cls.tester)
+ self.cast_id = cast_utils.create_cast(self.server, self.source_type,
+ self.target_type)
def runTest(self):
""" This function will update added cast."""
- all_id = get_ids()
- server_ids = all_id["sid"]
- db_ids_dict = all_id["did"][0]
- cast_ids_dict = all_id["cid"][0]
-
- for server_id in server_ids:
- db_id = db_ids_dict[int(server_id)]
- db_con = database_utils.verify_database(self.tester,
- utils.SERVER_GROUP,
- server_id,
- db_id)
- if len(db_con) == 0:
- raise Exception("No database(s) to delete for server id %s"
- % server_id)
- cast_id = cast_ids_dict[server_id]
- cast_get_data = cast_utils.verify_cast(self.tester,
- utils.SERVER_GROUP,
- server_id,
- db_id, cast_id)
-
- if cast_get_data.status_code == 200:
-
+ self.server_id = self.database_info["server_id"]
+ self.db_id = self.database_info['db_id']
+
+ db_con = database_utils.connect_database(self,
+ utils.SERVER_GROUP,
+ self.server_id,
+ self.db_id)
+ if not db_con["info"] == "Database connected.":
+ raise Exception("Could not connect to database.")
+ try:
+ connection = utils.get_db_connection(self.server['db'],
+ self.server['username'],
+ self.server['db_password'],
+ self.server['host'],
+ self.server['port'])
+ response = cast_utils.verify_cast(connection, self.source_type,
+ self.target_type)
+
+ if len(response) == 0:
+ raise Exception("Could not find cast.")
+ else:
data = {
- "description": advanced_config_data["cast_update_data"]
- ["comment"],
- "id": cast_id
+ "description": "This is cast update comment",
+ "id": self.cast_id
}
put_response = self.tester.put(
self.url + str(utils.SERVER_GROUP) + '/' +
- str(server_id) + '/' + str(
- db_id) +
- '/' + str(cast_id),
+ str(self.server_id) + '/' + str(
+ self.db_id) +
+ '/' + str(self.cast_id),
data=json.dumps(data),
follow_redirects=True)
self.assertEquals(put_response.status_code, 200)
- @classmethod
- def tearDownClass(cls):
- """
- This function deletes the added cast, database, server and the
- 'parent_id.pkl' file which is created in setUpClass.
-
- :return: None
- """
-
- cast_utils.delete_cast(cls.tester)
- database_utils.delete_database(cls.tester)
- server_utils.delete_server(cls.tester)
- utils.delete_parent_id_file()
+ except Exception as exception:
+ exception = "Exception: %s: line:%s %s" % (
+ file_name, sys.exc_traceback.tb_lineno, exception)
+ print(exception, file=sys.stderr)
+ raise Exception(exception)
+
+ def tearDown(self):
+ """This function disconnect the test database and drop added cast."""
+
+ connection = utils.get_db_connection(self.server['db'],
+ self.server['username'],
+ self.server['db_password'],
+ self.server['host'],
+ self.server['port'])
+ cast_utils.drop_cast(connection, self.source_type,
+ self.target_type)
+ database_utils.disconnect_database(self, self.server_id,
+ self.db_id)
+ self.server['db'] = self.default_db
diff --git a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/utils.py b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/utils.py
index 0e89513..95becd2 100644
--- a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/utils.py
+++ b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/utils.py
@@ -7,139 +7,103 @@
#
# ##################################################################
+from __future__ import print_function
import os
-import pickle
-import json
-from regression.test_setup import advanced_config_data, pickle_path
-from regression import test_utils as utils
-from pgadmin.browser.server_groups.servers.databases.tests import \
- utils as database_utils
-from pgadmin.browser.server_groups.servers.tests import utils as server_utils
+import sys
+from regression.test_utils import get_db_connection
+file_name = os.path.basename(__file__)
CAST_URL = '/browser/cast/obj/'
-def get_cast_config_data(server_connect_data):
-
- adv_config_data = None
- db_user = server_connect_data['data']['user']['name']
-
- # Get the config data of appropriate db user
- for config_test_data in advanced_config_data['casts_credentials']:
- if db_user == config_test_data['owner']:
- adv_config_data = config_test_data
-
+def get_cast_data():
data = {
- "castcontext": adv_config_data
- ['cast_context'],
- "encoding": adv_config_data
- ['encoding'],
- "name": adv_config_data
- ['name'],
- "srctyp": adv_config_data
- ['source_type'],
- "trgtyp": adv_config_data
- ['target_type']
- }
+ "castcontext": "IMPLICIT",
+ "encoding": "UTF8",
+ "name": "money->bigint",
+ "srctyp": "money",
+ "trgtyp": "bigint",
+ }
return data
-def add_cast(tester):
- """
- This function add the cast in the existing database
-
- :param tester: test object
- :type tester: flask test object
- :return:None
- """
-
- all_id = utils.get_ids()
- server_ids = all_id["sid"]
- db_ids_dict = all_id["did"][0]
- server_group = utils.config_data['server_group']
-
- for server_id in server_ids:
- db_id = db_ids_dict[int(server_id)]
- db_con = database_utils.verify_database(tester, server_group,
- server_id, db_id)
- if db_con['data']['connected']:
- server_connect_response = server_utils.verify_server(
- tester, server_group, server_id)
-
- data = get_cast_config_data(server_connect_response)
-
- response = tester.post(CAST_URL + str(server_group) + '/' +
- str(server_id) + '/' + str(
- db_id) + '/',
- data=json.dumps(data),
- content_type='html/json')
-
- assert response.status_code == 200
- response_data = json.loads(response.data.decode('utf-8'))
- write_cast_info(response_data, server_id)
-
-
-def write_cast_info(response_data, server_id):
+def create_cast(server, source_type, target_type):
"""
- This function writes the server's details to file parent_id.pkl
+ This function used to create cast in the existing dummy database
- :param response_data: server's data
- :type response_data: list of dictionary
- :param pickle_id_dict: contains ids of server,database,tables etc.
- :type pickle_id_dict: dict
- :return: None
+ :param server: test_server, cast_source_type, cast_target_type
+ :return: cast_id
"""
- cast_id = response_data['node']['_id']
- pickle_id_dict = utils.get_pickle_id_dict()
- if os.path.isfile(pickle_path):
- existing_server_id = open(pickle_path, 'rb')
- tol_server_id = pickle.load(existing_server_id)
- pickle_id_dict = tol_server_id
- if 'cid' in pickle_id_dict:
- if pickle_id_dict['cid']:
- # Add the cast_id as value in dict
- pickle_id_dict["cid"][0].update({server_id: cast_id})
- else:
- # Create new dict with server_id and cast_id
- pickle_id_dict["cid"].append({server_id: cast_id})
- cast_output = open(pickle_path, 'wb')
- pickle.dump(pickle_id_dict, cast_output)
- cast_output.close()
-
-
-def verify_cast(tester, server_group, server_id, db_id, cast_id):
-
- cast_response = tester.get(CAST_URL + str(server_group) + '/' +
- str(server_id) + '/' + str(db_id) +
- '/' + str(cast_id),
- content_type='html/json')
- return cast_response
-
-
-def delete_cast(tester):
- all_id = utils.get_ids()
- server_ids = all_id["sid"]
- db_ids_dict = all_id["did"][0]
- cast_ids_dict = all_id["cid"][0]
-
- for server_id in server_ids:
- db_id = db_ids_dict[int(server_id)]
- db_con = database_utils.verify_database(tester, utils.SERVER_GROUP,
- server_id,
- db_id)
- if len(db_con) == 0:
- raise Exception("No database(s) to delete for server id %s"
- % server_id)
- cast_id = cast_ids_dict[server_id]
- cast_get_data = verify_cast(tester, utils.SERVER_GROUP,
- server_id,
- db_id, cast_id)
-
- if cast_get_data.status_code == 200:
- delete_response = tester.delete(
- CAST_URL + str(utils.SERVER_GROUP) + '/' +
- str(server_id) + '/' + str(db_id) +
- '/' + str(cast_id),
- follow_redirects=True)
- return delete_response
+ try:
+ connection = get_db_connection(server['db'],
+ server['username'],
+ server['db_password'],
+ server['host'],
+ server['port'])
+ old_isolation_level = connection.isolation_level
+ connection.set_isolation_level(0)
+ pg_cursor = connection.cursor()
+ pg_cursor.execute("CREATE CAST (%s AS %s) WITHOUT"
+ " FUNCTION AS IMPLICIT" % (source_type, target_type))
+
+ connection.set_isolation_level(old_isolation_level)
+ connection.commit()
+
+ # Get 'oid' from newly created cast
+ pg_cursor.execute(
+ "SELECT ca.oid FROM pg_cast ca WHERE ca.castsource = "
+ "(SELECT t.oid FROM pg_type t WHERE format_type(t.oid, NULL)='%s') "
+ "AND ca.casttarget = (SELECT t.oid FROM pg_type t WHERE "
+ "format_type(t.oid, NULL) = '%s')" % (source_type, target_type))
+ oid = pg_cursor.fetchone()
+ cast_id = ''
+ if oid:
+ cast_id = oid[0]
+ connection.close()
+ return cast_id
+ except Exception as exception:
+ exception = "Exception: %s: line:%s %s" % (
+ file_name, sys.exc_traceback.tb_lineno, exception)
+ print(exception, file=sys.stderr)
+
+
+def verify_cast(connection, source_type, target_type):
+ """ This function will verify current cast."""
+
+ try:
+ pg_cursor = connection.cursor()
+
+ pg_cursor.execute(
+ "SELECT * FROM pg_cast ca WHERE ca.castsource = "
+ "(SELECT t.oid FROM pg_type t WHERE format_type(t.oid, NULL)='%s') "
+ "AND ca.casttarget = (SELECT t.oid FROM pg_type t WHERE "
+ "format_type(t.oid, NULL) = '%s')" % (source_type, target_type))
+ casts = pg_cursor.fetchall()
+ connection.close()
+ return casts
+ except Exception as exception:
+ exception = "%s: line:%s %s" % (
+ file_name, sys.exc_traceback.tb_lineno, exception)
+ print(exception, file=sys.stderr)
+
+
+def drop_cast(connection, source_type, target_type):
+ """This function used to drop the cast"""
+
+ try:
+ pg_cursor = connection.cursor()
+ pg_cursor.execute(
+ "SELECT * FROM pg_cast ca WHERE ca.castsource = "
+ "(SELECT t.oid FROM pg_type t WHERE format_type(t.oid, NULL)='%s') "
+ "AND ca.casttarget = (SELECT t.oid FROM pg_type t WHERE "
+ "format_type(t.oid, NULL) = '%s')" % (source_type, target_type))
+ if pg_cursor.fetchall():
+ pg_cursor.execute(
+ "DROP CAST (%s AS %s) CASCADE" % (source_type, target_type))
+ connection.commit()
+ connection.close()
+ except Exception as exception:
+ exception = "%s: line:%s %s" % (
+ file_name, sys.exc_traceback.tb_lineno, exception)
+ print(exception, file=sys.stderr)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/tests/utils.py b/web/pgadmin/browser/server_groups/servers/databases/tests/utils.py
index 6b26863..c1ef3a7 100644
--- a/web/pgadmin/browser/server_groups/servers/databases/tests/utils.py
+++ b/web/pgadmin/browser/server_groups/servers/databases/tests/utils.py
@@ -115,7 +115,7 @@ def create_database(connection, db_name):
raise Exception("Error while creating database. %s" % exception)
-def verify_database(self, server_group, server_id, db_id):
+def connect_database(self, server_group, server_id, db_id):
"""
This function verifies that database is exists and whether it connect
successfully or not
diff --git a/web/pgadmin/utils/route.py b/web/pgadmin/utils/route.py
index 74269de..be75478 100644
--- a/web/pgadmin/utils/route.py
+++ b/web/pgadmin/utils/route.py
@@ -7,15 +7,17 @@
#
##############################################################
+from __future__ import print_function
import sys
import unittest
-
+import os
from abc import ABCMeta, abstractmethod
from importlib import import_module
from werkzeug.utils import find_modules
-
import config
+file_name = os.path.basename(__file__)
+
class TestsGeneratorRegistry(ABCMeta):
"""
@@ -69,8 +71,10 @@ class TestsGeneratorRegistry(ABCMeta):
try:
if "tests." in str(module_name):
cls.import_app_modules(module_name)
- except ImportError:
- pass
+ except Exception as exception:
+ exception = "%s: line:%s %s" % (
+ file_name, sys.exc_traceback.tb_lineno, exception)
+ print(exception, file=sys.stderr)
else:
for module_name in find_modules(pkg, False, True):
try:
@@ -78,8 +82,11 @@ class TestsGeneratorRegistry(ABCMeta):
# is False
if "pgadmin.browser.tests" not in module_name:
cls.import_app_modules(module_name)
- except ImportError:
- pass
+ except Exception as exception:
+ exception = "%s: line:%s %s" % (
+ file_name, sys.exc_traceback.tb_lineno, exception)
+ print(exception, file=sys.stderr)
+
import six
diff --git a/web/regression/__init__.py b/web/regression/__init__.py
index 6b7cb2f..937cbcf 100644
--- a/web/regression/__init__.py
+++ b/web/regression/__init__.py
@@ -28,10 +28,8 @@ node_info_dict = {
"seid": [] # sequence
}
-global test_server_dict
-test_server_dict = {
+global parent_node_dict
+parent_node_dict = {
"server": [],
- "database": [],
- "tablespace": [],
- "role": []
+ "database": []
}
diff --git a/web/regression/runtests.py b/web/regression/runtests.py
index 87b6004..8be9fff 100644
--- a/web/regression/runtests.py
+++ b/web/regression/runtests.py
@@ -10,7 +10,6 @@
""" This file collect all modules/files present in tests directory and add
them to TestSuite. """
from __future__ import print_function
-
import argparse
import os
import sys
@@ -35,7 +34,7 @@ if sys.path[0] != root:
from pgadmin import create_app
import config
-import test_setup
+from regression import test_setup
# Delete SQLite db file if exists
if os.path.isfile(config.TEST_SQLITE_PATH):
@@ -59,14 +58,14 @@ if pgadmin_credentials:
'login_password']
# Execute the setup file
-exec (open("setup.py").read())
+exec(open("setup.py").read())
# Get the config database schema version. We store this in pgadmin.model
# as it turns out that putting it in the config files isn't a great idea
from pgadmin.model import SCHEMA_VERSION
# Delay the import test_utils as it needs updated config.SQLITE_PATH
-import test_utils
+from regression import test_utils
config.SETTINGS_SCHEMA_VERSION = SCHEMA_VERSION
diff --git a/web/regression/test_utils.py b/web/regression/test_utils.py
index 731f1b6..a5cc10c 100644
--- a/web/regression/test_utils.py
+++ b/web/regression/test_utils.py
@@ -40,7 +40,7 @@ def login_tester_account(tester):
:type tester: flask test client object
:return: None
"""
- if os.environ['PGADMIN_SETUP_EMAIL'] and\
+ if os.environ['PGADMIN_SETUP_EMAIL'] and \
os.environ['PGADMIN_SETUP_PASSWORD']:
email = os.environ['PGADMIN_SETUP_EMAIL']
password = os.environ['PGADMIN_SETUP_PASSWORD']
@@ -145,22 +145,19 @@ def create_database(server, db_name):
def drop_database(connection, database_name):
"""This function used to drop the database"""
-
try:
- pg_cursor = connection.cursor()
- pg_cursor.execute("SELECT * FROM pg_database db WHERE db.datname='%s'"
- % database_name)
- if pg_cursor.fetchall():
- # Release pid if any process using database
- pg_cursor.execute("select pg_terminate_backend(pid) from"
- " pg_stat_activity where datname='%s'" %
- database_name)
- old_isolation_level = connection.isolation_level
- connection.set_isolation_level(0)
- pg_cursor.execute('''DROP DATABASE "%s"''' % database_name)
- connection.set_isolation_level(old_isolation_level)
- connection.commit()
- connection.close()
+ if database_name not in ["postgres", "template1", "template0"]:
+ pg_cursor = connection.cursor()
+ pg_cursor.execute(
+ "SELECT * FROM pg_database db WHERE db.datname='%s'"
+ % database_name)
+ if pg_cursor.fetchall():
+ old_isolation_level = connection.isolation_level
+ connection.set_isolation_level(0)
+ pg_cursor.execute('''DROP DATABASE "%s"''' % database_name)
+ connection.set_isolation_level(old_isolation_level)
+ connection.commit()
+ connection.close()
except Exception as exception:
exception = "%s: line:%s %s" % (
file_name, sys.exc_traceback.tb_lineno, exception)
@@ -235,18 +232,17 @@ def create_test_server(server_info):
db_id = create_database(server_info, test_db_name)
# Add server info to test_server_dict
- regression.test_server_dict["server"].append({"server_id": srv_id,
+ regression.parent_node_dict["server"].append({"server_id": srv_id,
"server": server_info})
- regression.test_server_dict["database"].append({"server_id": srv_id,
+ regression.parent_node_dict["database"].append({"server_id": srv_id,
"db_id": db_id,
"db_name": test_db_name})
def delete_test_server(tester):
- test_server_dict = regression.test_server_dict
+ test_server_dict = regression.parent_node_dict
test_servers = test_server_dict["server"]
test_databases = test_server_dict["database"]
- test_table_spaces = test_server_dict["tablespace"]
try:
for test_server in test_servers:
srv_id = test_server["server_id"]
@@ -268,8 +264,8 @@ def delete_test_server(tester):
print(exception, file=sys.stderr)
# Clear test_server_dict
- for item in regression.test_server_dict:
- del regression.test_server_dict[item][:]
+ for item in regression.parent_node_dict:
+ del regression.parent_node_dict[item][:]
def remove_db_file():
@@ -281,6 +277,7 @@ def remove_db_file():
def _drop_objects(tester):
"""This function use to cleanup the created the objects(servers, databases,
schemas etc) during the test suite run"""
+
try:
conn = sqlite3.connect(config.SQLITE_PATH)
cur = conn.cursor()
@@ -319,9 +316,7 @@ def _drop_objects(tester):
server_info[1],
server_info[2])
# Do not drop the default databases
- if db[0] not in ["postgres", "template1",
- "template0"]:
- drop_database(connection, db[0])
+ drop_database(connection, db[0])
# Delete tablespace
connection = get_db_connection(server_info[3],
@@ -373,7 +368,7 @@ def _drop_objects(tester):
delete_server(tester, server_id)
except Exception as exception:
exception = "Exception while deleting server: %s:" \
- " line:%s %s" %\
+ " line:%s %s" % \
(file_name, sys.exc_traceback.tb_lineno,
exception)
print(exception, file=sys.stderr)
--
Sent via pgadmin-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgadmin-hackers