Module: sip-router
Branch: mariusbucur/dmq
Commit: ba31d863c9475bafc7d6073e3a6ebdd0b40e207a
URL:    
http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=ba31d863c9475bafc7d6073e3a6ebdd0b40e207a

Author: Marius Bucur <[email protected]>
Committer: Marius Bucur <[email protected]>
Date:   Wed Apr  6 19:30:31 2011 +0300

added support for binding the dmq module within another module.

also, finished the implementation for the dmq worker queues

---

 modules_k/dmq/bind_dmq.c |    7 +++
 modules_k/dmq/message.c  |   29 +++++++++++
 modules_k/dmq/message.h  |    2 +
 modules_k/dmq/peer.c     |   43 ++++++++++++++++
 modules_k/dmq/peer.h     |   35 +++++++++++++
 modules_k/dmq/worker.c   |  120 ++++++++++++++++++++++++++++++++++++++++++++++
 modules_k/dmq/worker.h   |   43 ++++++++++++++++
 7 files changed, 279 insertions(+), 0 deletions(-)

diff --git a/modules_k/dmq/bind_dmq.c b/modules_k/dmq/bind_dmq.c
new file mode 100644
index 0000000..6744753
--- /dev/null
+++ b/modules_k/dmq/bind_dmq.c
@@ -0,0 +1,7 @@
+#include "bind_dmq.h"
+#include "peer.h"
+
+int bind_dmq(dmq_api_t* api) {
+       api->register_dmq_peer = register_dmq_peer;
+       return 0;
+}
\ No newline at end of file
diff --git a/modules_k/dmq/message.c b/modules_k/dmq/message.c
new file mode 100644
index 0000000..55f385c
--- /dev/null
+++ b/modules_k/dmq/message.c
@@ -0,0 +1,29 @@
+#include "../../parser/parse_to.h"
+#include "../../parser/parse_uri.h" 
+#include "../../parser/parse_content.h"
+#include "../../parser/parse_from.h"
+#include "../../ut.h"
+#include "worker.h"
+#include "peer.h"
+#include "message.h"
+
+int handle_dmq_message(struct sip_msg* msg, char* str1, char* str2) {
+       dmq_peer_t* peer;
+       if ((parse_sip_msg_uri(msg) < 0) || (!msg->parsed_uri.user.s)) {
+                       LM_ERR("cannot parse msg URI\n");
+                       return -1;
+       }
+       LM_DBG("handle_dmq_message [%.*s %.*s] [%s %s]\n",
+              msg->first_line.u.request.method.len, 
msg->first_line.u.request.method.s,
+              msg->first_line.u.request.uri.len, 
msg->first_line.u.request.uri.s,
+              ZSW(str1), ZSW(str2));
+       /* the peer id is given as the userinfo part of the request URI */
+       peer = find_peer(msg->parsed_uri.user);
+       if(!peer) {
+               LM_DBG("no peer found for %.*s\n", msg->parsed_uri.user.len, 
msg->parsed_uri.user.s);
+               return 0;
+       }
+       LM_DBG("handle_dmq_message peer found: %.*s\n", 
msg->parsed_uri.user.len, msg->parsed_uri.user.s);
+       add_dmq_job(msg, peer);
+       return 0;
+}
\ No newline at end of file
diff --git a/modules_k/dmq/message.h b/modules_k/dmq/message.h
new file mode 100644
index 0000000..7e0cb95
--- /dev/null
+++ b/modules_k/dmq/message.h
@@ -0,0 +1,2 @@
+
+int handle_dmq_message(struct sip_msg*, char*, char*);
\ No newline at end of file
diff --git a/modules_k/dmq/peer.c b/modules_k/dmq/peer.c
new file mode 100644
index 0000000..1ec177f
--- /dev/null
+++ b/modules_k/dmq/peer.c
@@ -0,0 +1,43 @@
+#include "peer.h"
+
+dmq_peer_list_t* init_peer_list() {
+       dmq_peer_list_t* peer_list = shm_malloc(sizeof(dmq_peer_list_t));
+       memset(peer_list, 0, sizeof(dmq_peer_list_t));
+       return peer_list;
+}
+
+dmq_peer_t* search_peer_list(dmq_peer_list_t* peer_list, dmq_peer_t* peer) {
+       dmq_peer_t* cur = peer_list->peers;
+       int len;
+       while(cur) {
+               /* len - the minimum length of the two strings */
+               len = cur->peer_id.len < peer->peer_id.len ? 
cur->peer_id.len:peer->peer_id.len;
+               if(strncasecmp(cur->peer_id.s, peer->peer_id.s, len) == 0) {
+                       return cur;
+               }
+       }
+       return 0;
+}
+
+void add_peer(dmq_peer_list_t* peer_list, dmq_peer_t* peer) {
+       dmq_peer_t* new_peer = shm_malloc(sizeof(dmq_peer_t));
+       *new_peer = *peer;
+       new_peer->next = peer_list->peers;
+       peer_list->peers = new_peer;
+}
+
+int register_dmq_peer(dmq_peer_t* peer) {
+       if(search_peer_list(peer_list, peer)) {
+               LM_ERR("peer already exists: %.*s %.*s\n", peer->peer_id.len, 
peer->peer_id.s,
+                      peer->description.len, peer->description.s);
+               return -1;
+       }
+       add_peer(peer_list, peer);
+       return 0;
+}
+
+dmq_peer_t* find_peer(str peer_id) {
+       dmq_peer_t foo_peer;
+       foo_peer.peer_id = peer_id;
+       return search_peer_list(peer_list, &foo_peer);
+}
\ No newline at end of file
diff --git a/modules_k/dmq/peer.h b/modules_k/dmq/peer.h
new file mode 100644
index 0000000..72c851f
--- /dev/null
+++ b/modules_k/dmq/peer.h
@@ -0,0 +1,35 @@
+#ifndef PEER_H
+#define PEER_H
+
+#include <string.h>
+#include <stdlib.h>
+#include "../../str.h"
+#include "../../mem/mem.h"
+#include "../../mem/shm_mem.h"
+#include "../../parser/msg_parser.h"
+
+typedef int(*peer_callback_t)(struct sip_msg*);
+
+typedef struct dmq_peer {
+       str peer_id;
+       str description;
+       peer_callback_t callback;
+       struct dmq_peer* next;
+} dmq_peer_t;
+
+typedef struct dmq_peer_list {
+       dmq_peer_t* peers;
+       int count;
+} dmq_peer_list_t;
+
+extern dmq_peer_list_t* peer_list;
+
+dmq_peer_list_t* init_peer_list();
+dmq_peer_t* search_peer_list(dmq_peer_list_t* peer_list, dmq_peer_t* peer);
+typedef int (*register_dmq_peer_t)(dmq_peer_t*);
+
+int register_dmq_peer(dmq_peer_t* peer);
+dmq_peer_t* find_peer(str peer_id);
+
+
+#endif
\ No newline at end of file
diff --git a/modules_k/dmq/worker.c b/modules_k/dmq/worker.c
new file mode 100644
index 0000000..6eaae21
--- /dev/null
+++ b/modules_k/dmq/worker.c
@@ -0,0 +1,120 @@
+#include "dmq.h"
+#include "worker.h"
+
+void worker_loop(int id) {
+       dmq_worker_t* worker = &workers[id];
+       dmq_job_t* current_job;
+       int ret_value;
+       for(;;) {
+               LM_DBG("dmq_worker [%d %d] getting lock\n", id, my_pid());
+               lock_get(&worker->lock);
+               LM_DBG("dmq_worker [%d %d] lock acquired\n", id, my_pid());
+               /* multiple lock_release calls might be performed, so remove 
from queue until empty */
+               do {
+                       current_job = job_queue_pop(worker->queue);
+                       /* job_queue_pop might return NULL if queue is empty */
+                       if(current_job) {
+                               ret_value = current_job->f(current_job->msg);
+                               if(ret_value < 0) {
+                                       LM_ERR("running job failed\n");
+                               }
+                               shm_free(current_job);
+                               worker->jobs_processed++;
+                       }
+               } while(job_queue_size(worker->queue) > 0);
+       }
+}
+
+int add_dmq_job(struct sip_msg* msg, dmq_peer_t* peer) {
+       int i, found_available = 0;
+       dmq_job_t new_job;
+       dmq_worker_t* worker;
+       new_job.f = peer->callback;
+       new_job.msg = msg;
+       new_job.orig_peer = peer;
+       if(!num_workers) {
+               LM_ERR("error in add_dmq_job no workers spawned\n");
+               return -1;
+       }
+       /* initialize the worker with the first one */
+       worker = workers;
+       /* search for an available worker, or, if not possible, for the least 
busy one */
+       for(i = 0; i < num_workers; i++) {
+               if(job_queue_size(workers[i].queue) == 0) {
+                       worker = &workers[i];
+                       found_available = 1;
+                       break;
+               } else if(job_queue_size(workers[i].queue) < 
job_queue_size(worker->queue)) {
+                       worker = &workers[i];
+               }
+       }
+       if(!found_available) {
+               LM_DBG("no available worker found, passing job to the least 
busy one\n");
+       }
+       job_queue_push(worker->queue, &new_job);
+       lock_release(&worker->lock);
+       return 0;
+}
+
+void init_worker(dmq_worker_t* worker) {
+       memset(worker, 0, sizeof(*worker));
+       lock_init(&worker->lock);
+       // acquire the lock for the first time - so that dmq_worker_loop blocks
+       lock_get(&worker->lock);
+       worker->queue = alloc_job_queue();
+}
+
+job_queue_t* alloc_job_queue() {
+       job_queue_t* queue = shm_malloc(sizeof(job_queue_t));
+       atomic_set(&queue->count, 0);
+       queue->front = NULL;
+       queue->back = NULL;
+       lock_init(&queue->lock);
+       return queue;
+}
+
+void destroy_job_queue(job_queue_t* queue) {
+       shm_free(queue);
+}
+
+int job_queue_size(job_queue_t* queue) {
+       return atomic_get(&queue->count);
+}
+
+void job_queue_push(job_queue_t* queue, dmq_job_t* job) {
+       /* we need to copy the dmq_job into a newly created dmq_job in shm */
+       dmq_job_t* newjob = shm_malloc(sizeof(dmq_job_t));
+       *newjob = *job;
+       
+       lock_get(&queue->lock);
+       newjob->prev = NULL;
+       newjob->next = queue->back;
+       if(queue->back) {
+               queue->back->prev = newjob;
+       }
+       queue->back = newjob;
+       if(!queue->front) {
+               queue->front = newjob;
+       }
+       atomic_inc(&queue->count);
+       lock_release(&queue->lock);
+}
+dmq_job_t* job_queue_pop(job_queue_t* queue) {
+       dmq_job_t* front;
+       lock_get(&queue->lock);
+       if(!queue->front) {
+               lock_release(&queue->lock);
+               return NULL;
+       }
+       front = queue->front;
+       if(front->prev) {
+               queue->front = front->prev;
+               front->prev->next = NULL;
+       } else {
+               queue->front = NULL;
+               queue->back = NULL;
+       }
+       atomic_dec(&queue->count);
+       lock_release(&queue->lock);
+       return front;
+}
\ No newline at end of file
diff --git a/modules_k/dmq/worker.h b/modules_k/dmq/worker.h
new file mode 100644
index 0000000..61eda09
--- /dev/null
+++ b/modules_k/dmq/worker.h
@@ -0,0 +1,43 @@
+#ifndef DMQ_WORKER_H
+#define DMQ_WORKER_H
+
+#include "peer.h"
+#include "../../locking.h"
+#include "../../atomic_ops.h"
+#include "../../parser/msg_parser.h"
+
+typedef struct dmq_job {
+       peer_callback_t f;
+       struct sip_msg* msg;
+       dmq_peer_t* orig_peer;
+       struct dmq_job* next;
+       struct dmq_job* prev;
+} dmq_job_t;
+
+typedef struct job_queue {
+       atomic_t count;
+       struct dmq_job* back;
+       struct dmq_job* front;
+       gen_lock_t lock;
+} job_queue_t;
+
+struct dmq_worker {
+       job_queue_t* queue;
+       int jobs_processed;
+       gen_lock_t lock;
+       int pid;
+};
+
+typedef struct dmq_worker dmq_worker_t;
+
+void init_worker(dmq_worker_t* worker);
+int add_dmq_job(struct sip_msg*, dmq_peer_t*);
+void worker_loop(int id);
+
+job_queue_t* alloc_job_queue();
+void destroy_job_queue(job_queue_t* queue);
+void job_queue_push(job_queue_t* queue, dmq_job_t* job);
+dmq_job_t* job_queue_pop(job_queue_t* queue);
+int job_queue_size(job_queue_t* queue);
+
+#endif
\ No newline at end of file


_______________________________________________
sr-dev mailing list
[email protected]
http://lists.sip-router.org/cgi-bin/mailman/listinfo/sr-dev

Reply via email to