Hi All

I have added logic to support asynchronous connection in psycopg2. This
functionality will be required for the Debugger module, so sending the
patch. Attached is the patch file, please review it and let me know the
comments if any. If it looks good then please commit the code.

-- 
*Akshay Joshi*
*Principal Software Engineer *



*Phone: +91 20-3058-9517Mobile: +91 976-788-8246*
diff --git a/web/pgadmin/utils/driver/abstract.py b/web/pgadmin/utils/driver/abstract.py
index d6eb5f9..5fff211 100644
--- a/web/pgadmin/utils/driver/abstract.py
+++ b/web/pgadmin/utils/driver/abstract.py
@@ -67,7 +67,7 @@ class BaseConnection(object):
         It is a base class for database connection. A different connection
         drive must implement this to expose abstract methods for this server.
 
-        General idea is to create a wrapper around the actaul driver
+        General idea is to create a wrapper around the actual driver
         implementation. It will be instantiated by the driver factory
         basically. And, they should not be instantiated directly.
 
@@ -82,9 +82,12 @@ class BaseConnection(object):
       - Implement this method to execute the given query and returns single
         datum result.
 
+    * execute_async(query, params)
+      - Implement this method to execute the given query asynchronously and returns result.
+
     * execute_2darray(query, params)
       - Implement this method to execute the given query and returns the result
-        as a 2 dimentional array.
+        as a 2 dimensional array.
 
     * execute_dict(query, params)
       - Implement this method to execute the given query and returns the result
@@ -113,8 +116,22 @@ class BaseConnection(object):
       NOTE: Please use BaseDriver.release_connection(...) for releasing the
             connection object for better memory management, and connection pool
             management.
+
+    * _wait(conn)
+      - Implement this method to wait for asynchronous connection. This is a blocking call.
+
+    * _wait_timeout(conn, time)
+      - Implement this method to wait for asynchronous connection with timeout. This is a non blocking call.
+
+    * poll()
+      - Implement this method to poll the data of query running on asynchronous connection.
     """
 
+    ASYNC_OK = 1
+    ASYNC_READ_TIMEOUT = 2
+    ASYNC_WRITE_TIMEOUT = 3
+    ASYNC_NOT_CONNECTED = 4
+
     @abstractmethod
     def connect(self, **kwargs):
         pass
@@ -124,6 +141,10 @@ class BaseConnection(object):
         pass
 
     @abstractmethod
+    def execute_async(self, query, params=None):
+        pass
+
+    @abstractmethod
     def execute_2darray(self, query, params=None):
         pass
 
@@ -150,3 +171,16 @@ class BaseConnection(object):
     @abstractmethod
     def _release(self):
         pass
+
+    @abstractmethod
+    def _wait(self, conn):
+        pass
+
+    @abstractmethod
+    def _wait_timeout(self, conn, time):
+        pass
+
+    @abstractmethod
+    def poll(self):
+        pass
+
diff --git a/web/pgadmin/utils/driver/psycopg2/__init__.py b/web/pgadmin/utils/driver/psycopg2/__init__.py
index 3badf3f..9e1c292 100644
--- a/web/pgadmin/utils/driver/psycopg2/__init__.py
+++ b/web/pgadmin/utils/driver/psycopg2/__init__.py
@@ -21,12 +21,15 @@ from ..abstract import BaseDriver, BaseConnection
 from pgadmin.settings.settings_model import Server, User
 from pgadmin.utils.crypto import decrypt
 import random
+import select
 
 from .keywords import ScanKeyword
 
 
 _ = gettext
 
+ASYNC_WAIT_TIMEOUT = 0.1 # in seconds or 100 milliseconds
+
 
 class Connection(BaseConnection):
     """
@@ -43,8 +46,11 @@ class Connection(BaseConnection):
     * execute_scalar(query, params)
       - Execute the given query and returns single datum result
 
+    * execute_async(query, params)
+      - Execute the given query asynchronously and returns result.
+
     * execute_2darray(query, params)
-      - Execute the given query and returns the result as a 2 dimentional
+      - Execute the given query and returns the result as a 2 dimensional
         array.
 
     * execute_dict(query, params)
@@ -59,15 +65,24 @@ class Connection(BaseConnection):
       - Reconnect the database server (if possible)
 
     * transaction_status()
-      - Trasaction Status
+      - Transaction Status
 
     * ping()
       - Ping the server.
 
     * _release()
       - Release the connection object of psycopg2
+
+    * _wait(conn)
+      - This method is used to wait for asynchronous connection. This is a blocking call.
+
+    * _wait_timeout(conn, time)
+      - This method is used to wait for asynchronous connection with timeout. This is a non blocking call.
+
+    * poll()
+      - This method is used to poll the data of query running on asynchronous connection.
     """
-    def __init__(self, manager, conn_id, db, auto_reconnect=True):
+    def __init__(self, manager, conn_id, db, auto_reconnect=True, async=0):
         assert(manager is not None)
         assert(conn_id is not None)
 
@@ -76,6 +91,9 @@ class Connection(BaseConnection):
         self.db = db if db is not None else manager.db
         self.conn = None
         self.auto_reconnect = auto_reconnect
+        self.async = async
+        self.__async_cursor = None
+        self.__async_query_id = None
 
         super(Connection, self).__init__()
 
@@ -123,9 +141,15 @@ class Connection(BaseConnection):
                     port=mgr.port,
                     database=self.db,
                     user=mgr.user,
-                    password=password
+                    password=password,
+                    async=self.async
                     )
 
+            # If connection is asynchronous then we will have to wait
+            # until the connection is ready to use.
+            if self.async == 1:
+                self._wait(pg_conn)
+
         except psycopg2.Error as e:
             if e.pgerror:
                 msg = e.pgerror
@@ -144,10 +168,14 @@ Failed to connect to the database server(#{server_id}) for connection ({conn_id}
 
             return False, msg
 
-        pg_conn.autocommit = True
         self.conn = pg_conn
 
-        self.conn.set_client_encoding("UNICODE")
+        # autocommit and client encoding not worked with asynchronous connection
+        # By default asynchronous conenction runs in autocommit mode
+        if self.async == 0:
+            self.conn.autocommit = True
+            self.conn.set_client_encoding("UNICODE")
+
         status, res = self.execute_scalar("""
 SET DateStyle=ISO;
 SET client_min_messages=notice;
@@ -308,6 +336,22 @@ Attempt to reconnect it failed with the below error:
 
         return True, cur
 
+    def __internal_blocking_execute(self, cur, query, params):
+        """
+        This function executes the query using cursor's execute function,
+        but in case of asynchronous connection we need to wait for the
+        transaction to be completed. If self.async is 1 then it is a
+        blocking call.
+
+        Args:
+            cur: Cursor object
+            query: SQL query to run.
+            params: Extra parameters
+        """
+        cur.execute(query, params)
+        if self.async == 1:
+            self._wait(cur.connection)
+
     def execute_scalar(self, query, params=None):
         status, cur = self.__cursor()
 
@@ -324,7 +368,7 @@ Attempt to reconnect it failed with the below error:
                     )
                 )
         try:
-            cur.execute(query, params)
+            self.__internal_blocking_execute(cur, query, params)
         except psycopg2.Error as pe:
             cur.close()
             errmsg = str(pe)
@@ -346,6 +390,48 @@ Attempt to reconnect it failed with the below error:
 
         return True, None
 
+    def execute_async(self, query, params=None):
+        """
+        This function executes the given query asynchronously and returns result.
+
+        Args:
+            query: SQL query to run.
+            params: extra parameters to the function
+        """
+        status, cur = self.__cursor()
+
+        if not status:
+            return False, str(cur)
+        query_id = random.randint(1, 9999999)
+
+        current_app.logger.log(25,
+                "Execute (async) for server #{server_id} - {conn_id} (Query-id: {query_id}):\n{query}".format(
+                    server_id=self.manager.sid,
+                    conn_id=self.conn_id,
+                    query=query,
+                    query_id=query_id
+                    )
+                )
+        try:
+            cur.execute(query, params)
+            res = self._wait_timeout(cur.connection, ASYNC_WAIT_TIMEOUT)
+        except psycopg2.Error as pe:
+            errmsg = str(pe)
+            current_app.logger.error(
+                    "Failed to execute query (execute_async) for the server #{server_id} - {conn_id} (Query-id: {query_id}):\nError Message:{errmsg}".format(
+                        server_id=self.manager.sid,
+                        conn_id=self.conn_id,
+                        query=query,
+                        errmsg=errmsg,
+                        query_id=query_id
+                        )
+                    )
+            return False, errmsg
+        self.__async_cursor = cur
+        self.__async_query_id = query_id
+
+        return True, res
+
     def execute_2darray(self, query, params=None):
         status, cur = self.__cursor()
 
@@ -362,7 +448,7 @@ Attempt to reconnect it failed with the below error:
                     )
                 )
         try:
-            cur.execute(query, params)
+            self.__internal_blocking_execute(cur, query, params)
         except psycopg2.Error as pe:
             cur.close()
             errmsg = str(pe)
@@ -405,7 +491,7 @@ Attempt to reconnect it failed with the below error:
                     )
                 )
         try:
-            cur.execute(query, params)
+            self.__internal_blocking_execute(cur, query, params)
         except psycopg2.Error as pe:
             cur.close()
             errmsg = str(pe)
@@ -497,6 +583,82 @@ Failed to reset the connection of the server due to following error:
             self.conn.close()
             self.conn = None
 
+    def _wait(self, conn):
+        """
+        This function is used for the asynchronous connection,
+        it will call poll method in a infinite loop till poll
+        returns psycopg2.extensions.POLL_OK. This is a blocking
+        call.
+
+        Args:
+            conn: connection object
+        """
+
+        while 1:
+            state = conn.poll()
+            if state == psycopg2.extensions.POLL_OK:
+                break
+            elif state == psycopg2.extensions.POLL_WRITE:
+                select.select([], [conn.fileno()], [])
+            elif state == psycopg2.extensions.POLL_READ:
+                select.select([conn.fileno()], [], [])
+            else:
+                raise psycopg2.OperationalError("poll() returned %s from _wait function" % state)
+
+    def _wait_timeout(self, conn, time):
+        """
+        This function is used for the asynchronous connection,
+        it will call poll method and return the status. If state is
+        psycopg2.extensions.POLL_WRITE and psycopg2.extensions.POLL_READ
+        function will wait for the given timeout.This is not a blocking call.
+
+        Args:
+            conn: connection object
+            time: wait time
+        """
+
+        state = conn.poll()
+        if state == psycopg2.extensions.POLL_OK:
+            return self.ASYNC_OK
+        elif state == psycopg2.extensions.POLL_WRITE:
+            select.select([], [conn.fileno()], [], time)
+            return self.ASYNC_WRITE_TIMEOUT
+        elif state == psycopg2.extensions.POLL_READ:
+            select.select([conn.fileno()], [], [], time)
+            return self.ASYNC_READ_TIMEOUT
+        else:
+            raise psycopg2.OperationalError("poll() returned %s from _wait_timeout function" % state)
+
+    def poll(self):
+        """
+        This function is a wrapper around connection's poll function.
+        It internally uses the _wait_timeout method to poll the
+        result on the connection object. In case of success it
+        returns the result of the query.
+        """
+
+        cur = self.__async_cursor
+        if not cur:
+            return False, gettext("Cursor could not found for the aysnc connection."), None
+
+        current_app.logger.log(25, "Polling result for (Query-id: {query_id})".format(
+                    query_id=self.__async_query_id
+            )
+        )
+
+        status = self._wait_timeout(self.conn, ASYNC_WAIT_TIMEOUT)
+        if status == self.ASYNC_OK:
+            if cur.rowcount > 0:
+                # Fetch the column information
+                colinfo = [desc for desc in cur.description]
+                result = []
+                # Fetch the data rows.
+                for row in cur:
+                    result.append(dict(row))
+                self.__async_cursor = None
+                return status, result, colinfo
+        return status, None, None
+
 
 class ServerManager(object):
     """
@@ -588,7 +750,7 @@ WHERE db.oid = {0}""".format(did))
 
                         if did not in self.db_info:
                             raise Exception(gettext(
-                                "Coudn't find the database!"
+                                "Couldn't find the database!"
                                 ))
 
         if database is None:
@@ -602,8 +764,9 @@ WHERE db.oid = {0}""".format(did))
         if my_id in self.connections:
             return self.connections[my_id]
         else:
+            async = 1 if conn_id is not None else 0
             self.connections[my_id] = Connection(
-                    self, my_id, database, auto_reconnect
+                    self, my_id, database, auto_reconnect, async
                     )
 
             return self.connections[my_id]
@@ -785,7 +948,7 @@ class Driver(BaseDriver):
                 sess_mgr['pinged'] = curr_time
                 continue
 
-            if (curr_time - sess_mgr['pinged'] >= session_idle_timeout):
+            if curr_time - sess_mgr['pinged'] >= session_idle_timeout:
                 for mgr in [m for m in sess_mgr if isinstance(m,
                         ServerManager)]:
                     mgr.release()
-- 
Sent via pgadmin-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgadmin-hackers

Reply via email to