Author: dpage
Date: 2005-05-20 22:49:23 +0100 (Fri, 20 May 2005)
New Revision: 4229

Modified:
   trunk/pgadmin3/xtra/pgagent/connection.cpp
   trunk/pgadmin3/xtra/pgagent/include/job.h
   trunk/pgadmin3/xtra/pgagent/job.cpp
   trunk/pgadmin3/xtra/pgagent/pgAgent.cpp
Log:
Add threading support to pgAgent to allow multiple jobs to start in the same 
timeslot. Seems to work OK with no memory leaks or sync problems, however more 
testing is required. Also need to add code to keep track of threads that are 
created so they can be killed if the app terminates.

Modified: trunk/pgadmin3/xtra/pgagent/connection.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/connection.cpp  2005-05-20 15:44:01 UTC (rev 
4228)
+++ trunk/pgadmin3/xtra/pgagent/connection.cpp  2005-05-20 21:49:23 UTC (rev 
4229)
@@ -13,6 +13,7 @@
 
 DBconn *DBconn::primaryConn;
 wxString DBconn::basicConnectString;
+static wxMutex s_PoolLock;
 
 DBconn::DBconn(const wxString &name)
 {
@@ -64,6 +65,8 @@
 
 DBconn *DBconn::InitConnection(const wxString &connectString)
 {
+       wxMutexLocker lock(s_PoolLock);
+
     basicConnectString=connectString;
     wxString dbname;
 
@@ -96,6 +99,8 @@
 
 DBconn *DBconn::Get(const wxString &dbname)
 {
+       wxMutexLocker lock(s_PoolLock);
+
        DBconn *thisConn = primaryConn, *testConn;
 
     // find an existing connection
@@ -132,50 +137,77 @@
 
 void DBconn::Return()
 {
+       wxMutexLocker lock(s_PoolLock);
+
        LogMessage(_("Returning connection to database ") + this->dbname, 
LOG_DEBUG);
        inUse = false;
 }
 
 void DBconn::ClearConnections(bool all)
 {
+       wxMutexLocker lock(s_PoolLock);
+
        if (all)
                LogMessage(_("Clearing all connections"), LOG_DEBUG);
        else
                LogMessage(_("Clearing inactive connections"), LOG_DEBUG);
 
        DBconn *thisConn=primaryConn, *deleteConn;
+       int total=0, free=0, deleted=0;
 
-       // Find the last connection
-       while (thisConn->next != 0)
-               thisConn = thisConn->next;
+       if (thisConn)
+       {
 
-       // Delete connections as required
-       // If a connection is not in use, delete it, and reset the next and 
previous
-       // pointers appropriately. If it is in use, don't touch it.
-       while (thisConn->prev != 0)
-       {
-               if ((!thisConn->inUse) || all)
+               total++;
+
+               // Find the last connection
+               while (thisConn->next != 0)
                {
-                       deleteConn = thisConn;
+                       total++;
+                       
+                       if (!thisConn->inUse)
+                               free++;
 
-                       thisConn = deleteConn->prev;
-                       
-                       thisConn->next = deleteConn->next;
-                       
-                       if (deleteConn->next)
-                               deleteConn->next->prev = deleteConn->prev;
-                       
-                       delete deleteConn;
+                       thisConn = thisConn->next;
                }
-               else
+               if (!thisConn->inUse)
+                       free++;
+
+               // Delete connections as required
+               // If a connection is not in use, delete it, and reset the next 
and previous
+               // pointers appropriately. If it is in use, don't touch it.
+               while (thisConn->prev != 0)
                {
-                       thisConn = thisConn->prev;
+                       if ((!thisConn->inUse) || all)
+                       {
+                               deleteConn = thisConn;
+                               thisConn = deleteConn->prev;
+                               thisConn->next = deleteConn->next;
+                               if (deleteConn->next)
+                                       deleteConn->next->prev = 
deleteConn->prev;
+                               delete deleteConn;
+                               deleted++;
+                       }
+                       else
+                       {
+                               thisConn = thisConn->prev;
+                       }
                }
+
+               if (all)
+               {
+                       delete thisConn;
+                       deleted++;
+               }
+
+               wxString tmp;
+               tmp.Printf(_("Connection stats: total - %d, free - %d, deleted 
- %d"), total, free, deleted);
+               LogMessage(tmp, LOG_DEBUG);
+
        }
+       else
+               LogMessage(_("No connections found!"), LOG_DEBUG);
 
-       if (all)
-               delete thisConn;
-
 }
 
 

Modified: trunk/pgadmin3/xtra/pgagent/include/job.h
===================================================================
--- trunk/pgadmin3/xtra/pgagent/include/job.h   2005-05-20 15:44:01 UTC (rev 
4228)
+++ trunk/pgadmin3/xtra/pgagent/include/job.h   2005-05-20 21:49:23 UTC (rev 
4229)
@@ -37,11 +37,14 @@
 public:
     JobThread(const wxString &jid);
     ~JobThread();
+       bool Runnable() { return runnable; }
 
        virtual void *Entry();
 
 private:
        wxString jobid;
+       bool runnable;
+       Job *job;
 };
 
 #endif // JOB_H

Modified: trunk/pgadmin3/xtra/pgagent/job.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/job.cpp 2005-05-20 15:44:01 UTC (rev 4228)
+++ trunk/pgadmin3/xtra/pgagent/job.cpp 2005-05-20 21:49:23 UTC (rev 4229)
@@ -11,12 +11,16 @@
 
 #include "pgAgent.h"
 
+wxSemaphore *getDb;
+
 Job::Job(DBconn *conn, const wxString &jid)
 {
     threadConn=conn;
     jobid=jid;
     status=wxT("");
 
+       LogMessage(_("Starting job: ") + jobid, LOG_DEBUG);
+
     int rc=threadConn->ExecuteVoid(
         wxT("UPDATE pgagent.pga_job SET jobagentid=") + backendPid + wxT(", 
joblastrun=now() ")
         wxT(" WHERE jobagentid IS NULL AND jobid=") + jobid);
@@ -58,6 +62,8 @@
             );
     }
        threadConn->Return();
+
+       LogMessage(_("Completed job: ") + jobid, LOG_DEBUG);
 }
 
 
@@ -117,7 +123,7 @@
                 stepConn=DBconn::Get(steps->GetString(wxT("jstdbname")));
                 if (stepConn)
                 {
-                    LogMessage(_("Executing step ") + stepid + _(" on database 
") + steps->GetString(wxT("jstdbname")), LOG_DEBUG);
+                    LogMessage(_("Executing step ") + stepid + _(" (part of 
job ") + jobid + wxT(")"), LOG_DEBUG);
                     rc=stepConn->ExecuteVoid(steps->GetString(wxT("jstcode")));
                                        stepConn->Return();
                 }
@@ -168,7 +174,16 @@
 : wxThread(wxTHREAD_DETACHED)
 { 
        LogMessage(_("Creating job thread for job ") + jid, LOG_DEBUG); 
+       
+       runnable = false;
        jobid = jid; 
+
+       DBconn *threadConn=DBconn::Get(serviceDBname);
+    job = new Job(threadConn, jobid);
+
+    if (job->Runnable())
+        runnable = true;
+
 }
     
 
@@ -180,7 +195,11 @@
 
 void *JobThread::Entry()
 {
-       LogMessage(_("Running job thread for job ") + jobid, LOG_DEBUG);
+       if (runnable)
+       {
+               job->Execute();
+               delete job;
+       }
 
        return(NULL);
 }
\ No newline at end of file

Modified: trunk/pgadmin3/xtra/pgagent/pgAgent.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/pgAgent.cpp     2005-05-20 15:44:01 UTC (rev 
4228)
+++ trunk/pgadmin3/xtra/pgagent/pgAgent.cpp     2005-05-20 21:49:23 UTC (rev 
4229)
@@ -89,32 +89,25 @@
 
         if (res)
         {
-            wxString jobid=res->GetString(wxT("jobid"));
-            delete res;
+                       while(res->HasData())
+                       {
+                               wxString jobid=res->GetString(wxT("jobid"));
 
-            if (jobid != wxT(""))
-            {
-                               DBconn *threadConn=DBconn::Get(serviceDBname);
-                Job job(threadConn, jobid);
+                               JobThread *jt = new JobThread(jobid);
+       
+                               if (jt->Runnable())
+                               {
+                                       jt->Create();
+                                       jt->Run();
+                                       foundJobToExecute = true;
+                               }
+                               res->MoveNext();
 
-                if (job.Runnable())
-                {
-                    foundJobToExecute=true;
-                    LogMessage(_("Running job: ") + jobid, LOG_DEBUG);
-                                       
-                                       // JobThread *jt = new JobThread(jobid);
-                                       // jt->Run();
-                                       // jt->Wait();
-                    
-                                       job.Execute();
-                                       LogMessage(_("Completed job: ") + 
jobid, LOG_DEBUG);
-                }
-            }
-            else
-            {
-                               LogMessage(_("No jobs to run - sleeping..."), 
LOG_DEBUG);
-                WaitAWhile();
-            }
+                       }
+
+                       delete res;
+                       LogMessage(_("Sleeping..."), LOG_DEBUG);
+            WaitAWhile();
         }
         else
         {


---------------------------(end of broadcast)---------------------------
TIP 7: don't forget to increase your free space map settings

Reply via email to