Module: sip-router
Branch: mariusbucur/dmq
Commit: 78eea80ffdaf09dd6d247455d77c47a64eacae48
URL:    
http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=78eea80ffdaf09dd6d247455d77c47a64eacae48

Author: Marius Bucur <[email protected]>
Committer: Marius Bucur <[email protected]>
Date:   Wed Apr  6 20:12:00 2011 +0300

temporarily changed the presence module to test dmq functionality

---

 modules_k/dmq/bind_dmq.h      |   14 ++++++++
 modules_k/dmq/dmq.c           |   71 ++++++++++++++++++++++++-----------------
 modules_k/dmq/dmq.h           |   30 +++++++++++++++++
 modules_k/dmq/dmq_worker.h    |    8 -----
 modules_k/presence/presence.c |   29 +++++++++++++++++
 5 files changed, 115 insertions(+), 37 deletions(-)

diff --git a/modules_k/dmq/bind_dmq.h b/modules_k/dmq/bind_dmq.h
index e69de29..9c515a4 100644
--- a/modules_k/dmq/bind_dmq.h
+++ b/modules_k/dmq/bind_dmq.h
@@ -0,0 +1,14 @@
+#ifndef BIND_DMQ_H
+#define BIND_DMQ_H
+
+#include "peer.h"
+
+typedef struct dmq_api {
+       register_dmq_peer_t register_dmq_peer;
+} dmq_api_t;
+
+typedef int (*bind_dmq_f)(dmq_api_t* api);
+
+int bind_dmq(dmq_api_t* api);
+
+#endif
\ No newline at end of file
diff --git a/modules_k/dmq/dmq.c b/modules_k/dmq/dmq.c
index f5aae9a..90b0ac2 100644
--- a/modules_k/dmq/dmq.c
+++ b/modules_k/dmq/dmq.c
@@ -47,11 +47,11 @@
 #include "../../lib/kmi/mi.h"
 #include "../../lib/kcore/hash_func.h"
 #include "dmq.h"
+#include "peer.h"
 #include "bind_dmq.h"
-#include "dmq_worker.h"
+#include "worker.h"
 #include "../../mod_fix.h"
 
-static int mi_child_init(void);
 static int mod_init(void);
 static int child_init(int);
 static void destroy(void);
@@ -74,15 +74,18 @@ sl_api_t slb;
 
 /** module variables */
 dmq_worker_t* workers;
+dmq_peer_list_t* peer_list;
 
 /** module functions */
 static int mod_init(void);
 static int child_init(int);
 static void destroy(void);
-static int fixup_dmq(void** param, int param_no);
+static int handle_dmq_fixup(void** param, int param_no);
 
 static cmd_export_t cmds[] = {
-       {"handle_dmq_message",  (cmd_function)handle_dmq_message, 0, fixup_dmq, 
0, 
+       {"handle_dmq_message",  (cmd_function)handle_dmq_message, 0, 
handle_dmq_fixup, 0, 
+               REQUEST_ROUTE},
+       {"bind_dmq",        (cmd_function)bind_dmq, 0, 0, 0,
                REQUEST_ROUTE},
        {0, 0, 0, 0, 0, 0}
 };
@@ -93,7 +96,6 @@ static param_export_t params[] = {
 };
 
 static mi_export_t mi_cmds[] = {
-       {"cleanup", 0, 0, 0, mi_child_init},
        {0, 0, 0, 0, 0}
 };
 
@@ -117,7 +119,6 @@ struct module_exports exports = {
  * init module function
  */
 static int mod_init(void) {
-       int i = 0;
        
        if(register_mi_mod(exports.name, mi_cmds)!=0) {
                LM_ERR("failed to register MI commands\n");
@@ -136,23 +137,21 @@ static int mod_init(void) {
 
        /* load all TM stuff */
        if(load_tm_api(&tmb)==-1) {
-               LM_ERR("Can't load tm functions. Module TM not loaded?\n");
+               LM_ERR("can't load tm functions. TM module probably not 
loaded\n");
                return -1;
        }
        
-       /* fork worker processes */
+       /* load peer list - the list containing the module callbacks for dmq */
+       peer_list = init_peer_list();
+       
+       /* register worker processes */
+       register_procs(num_workers);
+       
+       /* allocate workers array */
        workers = shm_malloc(num_workers * sizeof(*workers));
-       for(i = 0; i < num_workers; i++) {
-               int newpid = fork_process(PROC_NOCHLDINIT, "DMQ WORKER", 0);
-               if(newpid < 0) {
-                       LM_ERR("failed to form process\n");
-                       return -1;
-               } else if(newpid == 0) {
-                       /* child */
-                       // worker loop
-               } else {
-                       workers[i].pid = newpid;
-               }
+       if(workers == NULL) {
+               LM_ERR("error in shm_malloc\n");
+               return -1;
        }
        
        startup_time = (int) time(NULL);
@@ -163,26 +162,40 @@ static int mod_init(void) {
  * Initialize children
  */
 static int child_init(int rank) {
-       if (rank==PROC_INIT || rank==PROC_MAIN || rank==PROC_TCP_MAIN) {
+       int i, newpid;
+       if (rank == PROC_MAIN) {
+               /* fork worker processes */
+               for(i = 0; i < num_workers; i++) {
+                       init_worker(&workers[i]);
+                       LM_DBG("starting worker process %d\n", i);
+                       newpid = fork_process(PROC_NOCHLDINIT, "DMQ WORKER", 0);
+                       if(newpid < 0) {
+                               LM_ERR("failed to form process\n");
+                               return -1;
+                       } else if(newpid == 0) {
+                               // child - this will loop forever
+                               worker_loop(i);
+                       } else {
+                               workers[i].pid = newpid;
+                       }
+               }
+               return 0;
+       }
+       if(rank == PROC_INIT || rank == PROC_TCP_MAIN) {
                /* do nothing for the main process */
                return 0;
        }
 
        pid = my_pid();
-       
-       if(library_mode)
-               return 0;
-
-       return 0;
-}
-
-static int mi_child_init(void) {
        return 0;
 }
 
-
 /*
  * destroy function
  */
 static void destroy(void) {
 }
+
+static int handle_dmq_fixup(void** param, int param_no) {
+       return 0;
+}
\ No newline at end of file
diff --git a/modules_k/dmq/dmq.h b/modules_k/dmq/dmq.h
index d4b13de..3bd9c84 100644
--- a/modules_k/dmq/dmq.h
+++ b/modules_k/dmq/dmq.h
@@ -1,3 +1,33 @@
+#ifndef DMQ_H
+#define DMQ_H
+
+#include "../../dprint.h"
+#include "../../error.h"
+#include "../../sr_module.h"
+#include "bind_dmq.h"
+#include "peer.h"
+#include "worker.h"
 
 #define DEFAULT_NUM_WORKERS    2
+
+extern int num_workers;
+extern dmq_worker_t* workers;
+
+static inline int dmq_load_api(dmq_api_t* api) {
+       bind_dmq_f binddmq;
+       binddmq = (bind_dmq_f)find_export("bind_dmq", 0, 0);
+       if ( binddmq == 0) {
+               LM_ERR("cannot find bind_dmq\n");
+               return -1;
+       }
+       if (binddmq(api) < 0)
+       {
+               LM_ERR("cannot bind dmq api\n");
+               return -1;
+       }
+       return 0;
+}
+
 int handle_dmq_message(struct sip_msg* msg, char* str1 ,char* str2);
+
+#endif
\ No newline at end of file
diff --git a/modules_k/dmq/dmq_worker.h b/modules_k/dmq/dmq_worker.h
deleted file mode 100644
index e20b075..0000000
--- a/modules_k/dmq/dmq_worker.h
+++ /dev/null
@@ -1,8 +0,0 @@
-
-
-struct dmq_worker {
-       void* queue;
-       int pid;
-};
-
-typedef struct dmq_worker dmq_worker_t;
\ No newline at end of file
diff --git a/modules_k/presence/presence.c b/modules_k/presence/presence.c
index a1cb13f..e32e527 100644
--- a/modules_k/presence/presence.c
+++ b/modules_k/presence/presence.c
@@ -69,6 +69,7 @@
 #include "../../lib/kmi/mi.h"
 #include "../../lib/kcore/hash_func.h"
 #include "../pua/hash.h"
+#include "../dmq/dmq.h"
 #include "presence.h"
 #include "publish.h"
 #include "subscribe.h"
@@ -105,6 +106,9 @@ char* to_tag_pref = "10";
 struct tm_binds tmb;
 /* SL API structure */
 sl_api_t slb;
+/* dmq API structure */
+dmq_api_t dmq;
+register_dmq_peer_t register_dmq;
 
 /** module functions */
 
@@ -206,6 +210,22 @@ struct module_exports exports= {
        child_init                      /* per-child init function */
 };
 
+int dmq_callback(struct sip_msg* msg) {
+       LM_ERR("it worked - dmq module triggered the presence callback [%ld 
%d]\n", time(0), my_pid());
+       sleep(4);
+       return 0;
+}
+
+static void add_dmq_peer() {
+       dmq_peer_t presence_peer;
+       presence_peer.peer_id.s = "presence";
+       presence_peer.peer_id.len = 8;
+       presence_peer.description.s = "presence";
+       presence_peer.description.len = 8;
+       presence_peer.callback = dmq_callback;
+       register_dmq(&presence_peer);
+}
+
 /**
  * init module function
  */
@@ -268,6 +288,15 @@ static int mod_init(void)
                return -1;
        }
        
+       if(dmq_load_api(&dmq) < 0) {
+               LM_ERR("cannot load dmq api\n");
+               return -1;
+       } else {
+               register_dmq = dmq.register_dmq_peer;
+               add_dmq_peer();
+               LM_DBG("presence-dmq loaded\n");
+       }
+       
        if(db_url.s== NULL)
        {
                LM_ERR("database url not set!\n");


_______________________________________________
sr-dev mailing list
[email protected]
http://lists.sip-router.org/cgi-bin/mailman/listinfo/sr-dev

Reply via email to