commit 68f8e883f83f68c707b3be56e9bcbe159add14cd
Author: Charles Chance <charles.chance@sipcentric.com>
Date:   Thu Oct 3 17:35:52 2013 +0100

    htable: dmq initial integration

diff --git a/modules/htable/Makefile b/modules/htable/Makefile
index c1e683a..de7ea25 100644
--- a/modules/htable/Makefile
+++ b/modules/htable/Makefile
@@ -16,4 +16,5 @@ SERLIBPATH=../../lib
 SER_LIBS+=$(SERLIBPATH)/kmi/kmi
 SER_LIBS+=$(SERLIBPATH)/srdb1/srdb1
 SER_LIBS+=$(SERLIBPATH)/kcore/kcore
+SER_LIBS+=$(SERLIBPATH)/srutils/srutils
 include ../../Makefile.modules
diff --git a/modules/htable/api.c b/modules/htable/api.c
index 27d13a0..a3deccc 100644
--- a/modules/htable/api.c
+++ b/modules/htable/api.c
@@ -28,6 +28,7 @@
 
 #include "ht_api.h"
 #include "api.h"
+#include "ht_dmq.h"
 
 /**
  *
@@ -39,6 +40,11 @@ int ht_api_set_cell(str *hname, str *name, int type,
 	ht = ht_get_table(hname);
 	if(ht==NULL)
 		return -1;
+
+	if (ht->dmqreplicate>0 && ht_dmq_replicate_action(HT_DMQ_SET_CELL, hname, name, type, val, mode)!=0) {
+		LM_ERR("dmq relication failed\n");
+	}
+
 	return ht_set_cell(ht, name, type, val, mode);
 }
 
@@ -51,6 +57,9 @@ int ht_api_del_cell(str *hname, str *name)
 	ht = ht_get_table(hname);
 	if(ht==NULL)
 		return -1;
+	if (ht->dmqreplicate>0 && ht_dmq_replicate_action(HT_DMQ_DEL_CELL, hname, name, 0, NULL, 0)!=0) {
+		LM_ERR("dmq relication failed\n");
+	}
 	return ht_del_cell(ht, name);
 }
 
@@ -64,6 +73,9 @@ int ht_api_set_cell_expire(str *hname, str *name,
 	ht = ht_get_table(hname);
 	if(ht==NULL)
 		return -1;
+	if (ht->dmqreplicate>0 && ht_dmq_replicate_action(HT_DMQ_SET_CELL_EXPIRE, hname, name, type, val, 0)!=0) {
+		LM_ERR("dmq relication failed\n");
+	}
 	return ht_set_cell_expire(ht, name, type, val);
 }
 
@@ -86,9 +98,17 @@ int ht_api_get_cell_expire(str *hname, str *name,
 int ht_api_rm_cell_re(str *hname, str *sre, int mode)
 {
 	ht_t* ht;
+	int_str isval;
 	ht = ht_get_table(hname);
 	if(ht==NULL)
 		return -1;
+	if (ht->dmqreplicate>0) {
+		isval.s.s = sre->s;
+		isval.s.len = sre->len;
+		if (ht_dmq_replicate_action(HT_DMQ_RM_CELL_RE, hname, NULL, AVP_VAL_STR, &isval, mode)!=0) {
+			LM_ERR("dmq relication failed\n");
+		}
+	}
 	if(ht_rm_cell_re(sre, ht, mode /* 0 - name; 1 - value */)<0)
 		return -1;
 	return 0;
diff --git a/modules/htable/doc/htable_admin.xml b/modules/htable/doc/htable_admin.xml
index 815d16b..3279738 100644
--- a/modules/htable/doc/htable_admin.xml
+++ b/modules/htable/doc/htable_admin.xml
@@ -31,6 +31,10 @@
 		be adjusted per itme via assignment operation at runtime.
 	</para>
 	<para>
+		Replication between multiple servers is performed automatically (if 
+		enabled) via the DMQ module.
+	</para>
+	<para>
 		You can read more about hash tables at:
 		http://en.wikipedia.org/wiki/Hash_table.
 	</para>
@@ -121,7 +125,7 @@ if(is_present_hf("Authorization"))
 			<itemizedlist>
 			<listitem>
 			<para>
-				<emphasis>No dependencies on other &kamailio; modules</emphasis>.
+				<emphasis>If DMQ replication is enabled, the DMQ module must be loaded first.</emphasis>.
 			</para>
 			</listitem>
 			</itemizedlist>
@@ -272,6 +276,11 @@ if(is_present_hf("Authorization"))
 			<emphasis>updateexpire</emphasis> - if set to 1 (default), the time until expiration of an item is reset when that item is updated.  Certain uses of htable may dictate that updates should not reset the expiration timeout, however, in which case this attribute can be set to 0.
 		</para>
 		</listitem>
+		<listitem>
+		<para>
+			<emphasis>dmqreplicate</emphasis> - if set to 1, any actions (set, update, delete etc.) performed upon entries in this table will be replicated to other nodes (htable peers). Please note, module parameter "enable_dmq" must also be set in order for this to apply (see below). Default is 0 (no replication).
+		</para>
+		</listitem>
 		</itemizedlist>
 		<para>
 		<emphasis>
@@ -284,7 +293,7 @@ if(is_present_hf("Authorization"))
 ...
 modparam("htable", "htable", "a=&gt;size=4;autoexpire=7200;dbtable=htable_a;")
 modparam("htable", "htable", "b=&gt;size=5;")
-modparam("htable", "htable", "c=&gt;size=4;autoexpire=7200;initval=1;")
+modparam("htable", "htable", "c=&gt;size=4;autoexpire=7200;initval=1;dmqreplicate=1;")
 ...
 </programlisting>
 		</example>
@@ -504,6 +513,33 @@ modparam("htable", "db_expires", 1)
 </programlisting>
 		</example>
 	</section>
+	<section>
+		<title><varname>enable_dmq</varname> (integer)</title>
+		<para>
+			If set to 1, will enable DMQ replication of actions performed upon 
+			entries in all tables having "dmqreplicate" parameter set. Any update 
+			action performed via psuedo-variables, MI and RPC commands will be 
+			repeated on all other nodes. Therefore, it is important to ensure the
+			table definition (size, autoexpire etc.) is identical across all instances.
+		</para>
+		<para>
+			Currently, values are not replicated on load from DB as it is expected 
+			that in these cases, all servers will load their values from the same DB.
+		</para>
+		<para>
+		<emphasis>
+			Default value is 0.
+		</emphasis>
+		</para>
+		<example>
+		<title>Set <varname>enable_dmq</varname> parameter</title>
+		<programlisting format="linespecific">
+...
+modparam("htable", "enable_dmq", 1)
+...
+</programlisting>
+		</example>
+	</section>
 	</section>
 	<section>
 	<title>Functions</title>
diff --git a/modules/htable/ht_api.c b/modules/htable/ht_api.c
index 3dd96cc..f55e784 100644
--- a/modules/htable/ht_api.c
+++ b/modules/htable/ht_api.c
@@ -218,7 +218,7 @@ ht_t* ht_get_table(str *name)
 }
 
 int ht_add_table(str *name, int autoexp, str *dbtable, int size, int dbmode,
-		int itype, int_str *ival, int updateexpire)
+		int itype, int_str *ival, int updateexpire, int dmqreplicate)
 {
 	unsigned int htid;
 	ht_t *ht;
@@ -261,7 +261,7 @@ int ht_add_table(str *name, int autoexp, str *dbtable, int size, int dbmode,
 	ht->flags = itype;
 	if(ival!=NULL)
 		ht->initval = *ival;
-
+	ht->dmqreplicate = dmqreplicate;
 	ht->next = _ht_root;
 	_ht_root = ht;
 	return 0;
@@ -761,6 +761,7 @@ int ht_table_spec(char *spec)
 	unsigned int size = 4;
 	unsigned int dbmode = 0;
 	unsigned int updateexpire = 1;
+	unsigned int dmqreplicate = 0;
 	str in;
 	str tok;
 	param_t *pit=NULL;
@@ -817,11 +818,16 @@ int ht_table_spec(char *spec)
 				goto error;
 
 			LM_DBG("htable [%.*s] - updateexpire [%u]\n", name.len, name.s, updateexpire); 
+		} else if(pit->name.len == 12 && strncmp(pit->name.s, "dmqreplicate", 12) == 0) {
+			if(str2int(&tok, &dmqreplicate) != 0)
+				goto error;
+
+			LM_DBG("htable [%.*s] - dmqreplicate [%u]\n", name.len, name.s, dmqreplicate); 
 		} else { goto error; }
 	}
 
 	return ht_add_table(&name, autoexpire, &dbtable, size, dbmode,
-			itype, &ival, updateexpire);
+			itype, &ival, updateexpire, dmqreplicate);
 
 error:
 	LM_ERR("invalid htable parameter [%.*s]\n", in.len, in.s);
diff --git a/modules/htable/ht_api.h b/modules/htable/ht_api.h
index 98b2eae..f1b504b 100644
--- a/modules/htable/ht_api.h
+++ b/modules/htable/ht_api.h
@@ -62,6 +62,7 @@ typedef struct _ht
 	int_str initval;
 	int updateexpire;
 	unsigned int htsize;
+	int dmqreplicate;
 	ht_entry_t *entries;
 	struct _ht *next;
 } ht_t;
@@ -73,7 +74,7 @@ typedef struct _ht_pv {
 } ht_pv_t, *ht_pv_p;
 
 int ht_add_table(str *name, int autoexp, str *dbtable, int size, int dbmode,
-		int itype, int_str *ival, int updateexpire);
+		int itype, int_str *ival, int updateexpire, int dmqreplicate);
 int ht_init_tables(void);
 int ht_destroy(void);
 int ht_set_cell(ht_t *ht, str *name, int type, int_str *val, int mode);
diff --git a/modules/htable/ht_dmq.c b/modules/htable/ht_dmq.c
new file mode 100644
index 0000000..5558d07
--- /dev/null
+++ b/modules/htable/ht_dmq.c
@@ -0,0 +1,282 @@
+/*
+ * $Id$
+ *
+ * Copyright (C) 2013 Charles Chance (Sipcentric Ltd)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+
+#include "ht_dmq.h"
+#include "ht_api.h"
+
+static str ht_dmq_content_type = str_init("application/json");
+static str dmq_200_rpl  = str_init("OK");
+static str dmq_400_rpl  = str_init("Bad Request");
+static str dmq_500_rpl  = str_init("Server Internal Error");
+
+typedef struct _ht_dmq_repdata {
+	int action;
+	str htname;
+	str cname;
+	int type;
+	int intval;
+	str strval;
+	int expire;
+} ht_dmq_repdata_t;
+
+dmq_api_t ht_dmqb;
+dmq_peer_t* ht_dmq_peer;
+dmq_resp_cback_t ht_dmq_resp_callback = {&ht_dmq_resp_callback_f, 0};
+
+/**
+ * @brief add notification peer
+ */
+int ht_dmq_initialize()
+{
+	/* load the DMQ API */
+	if (dmq_load_api(&ht_dmqb)!=0) {
+		LM_ERR("cannot load dmq api\n");
+		return -1;
+	} else {
+		LM_DBG("loaded dmq api\n");
+	}
+
+	dmq_peer_t not_peer;
+	not_peer.callback = ht_dmq_handle_msg;
+	not_peer.description.s = "htable";
+	not_peer.description.len = 6;
+	not_peer.peer_id.s = "htable";
+	not_peer.peer_id.len = 6;
+	ht_dmq_peer = ht_dmqb.register_dmq_peer(&not_peer);
+	if(!ht_dmq_peer) {
+		LM_ERR("error in register_dmq_peer\n");
+		goto error;
+	} else {
+		LM_DBG("dmq peer registered\n");
+	}
+	return 0;
+error:
+	return -1;
+}
+
+int ht_dmq_broadcast(str* body) {
+        if (!ht_dmq_peer) {
+                LM_ERR("ht_dmq_peer is null!\n");
+                return -1;
+        }
+        LM_DBG("sending broadcast...\n");
+        ht_dmqb.bcast_message(ht_dmq_peer, body, 0, &ht_dmq_resp_callback, 1, &ht_dmq_content_type);
+        return 0;
+}
+
+/**
+ * @brief ht dmq callback
+ */
+int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp)
+{
+	int content_length;
+	str body;
+	ht_dmq_action_t action = HT_DMQ_NONE;
+	str htname, cname;
+	int type = 0, mode = 0;
+	int_str val;
+	srjson_doc_t jdoc;
+	srjson_t *it = NULL;
+
+	/* received dmq message */
+	LM_DBG("dmq message received\n");
+	/* parse the message headers */
+	if(parse_headers(msg, HDR_EOH_F, 0) < 0) {
+		LM_ERR("error parsing message headers\n");
+		goto error;
+	}
+	
+	if(!msg->content_length) {
+		LM_ERR("no content length header found\n");
+		goto invalid;
+	}
+	content_length = get_content_length(msg);
+	if(!content_length) {
+		LM_DBG("content length is 0\n");
+		goto invalid;
+	}
+
+	body.s = get_body(msg);
+	body.len = content_length;
+
+	if (!body.s) {
+		LM_ERR("unable to get body\n");
+		goto error;
+	}
+
+	/* parse body */
+	LM_DBG("body: %.*s\n", body.len, body.s);	
+
+	srjson_InitDoc(&jdoc, NULL);
+	jdoc.buf = body;
+
+	if(jdoc.root == NULL) {
+		jdoc.root = srjson_Parse(&jdoc, jdoc.buf.s);
+		if(jdoc.root == NULL)
+		{
+			LM_ERR("invalid json doc [[%s]]\n", jdoc.buf.s);
+			goto invalid;
+		}
+	}
+
+	for(it=jdoc.root->child; it; it = it->next)
+	{
+		LM_DBG("found field: %s\n", it->string);
+		if (strcmp(it->string, "action")==0) {
+			action = it->valueint;
+		} else if (strcmp(it->string, "htname")==0) {
+			htname.s = it->valuestring;
+			htname.len = strlen(htname.s);
+		} else if (strcmp(it->string, "cname")==0) {
+			cname.s = it->valuestring;
+			cname.len = strlen(cname.s);
+		} else if (strcmp(it->string, "type")==0) {
+			type = it->valueint;
+		} else if (strcmp(it->string, "strval")==0) {
+			val.s.s = it->valuestring;
+			val.s.len = strlen(val.s.s);
+		} else if (strcmp(it->string, "intval")==0) {
+			val.n = it->valueint;
+		} else if (strcmp(it->string, "mode")==0) {
+			mode = it->valueint;
+		} else {
+			LM_ERR("unrecognized field in json object\n");
+			goto invalid;
+		}
+	}	
+
+	if (ht_dmq_replay_action(action, &htname, &cname, type, &val, mode)!=0) {
+		LM_ERR("failed to replay action\n");
+		goto error;
+	}
+
+	srjson_DestroyDoc(&jdoc);
+	resp->reason = dmq_200_rpl;
+	resp->resp_code = 200;
+	return 0;
+
+invalid:
+	srjson_DestroyDoc(&jdoc);
+	resp->reason = dmq_400_rpl;
+	resp->resp_code = 400;
+	return 0;
+
+error:
+	srjson_DestroyDoc(&jdoc);
+	resp->reason = dmq_500_rpl;
+	resp->resp_code = 500;	
+	return 0;
+}
+
+int ht_dmq_replicate_action(ht_dmq_action_t action, str* htname, str* cname, int type, int_str* val, int mode) {
+
+	LM_DBG("entering ht_dmq_replicate_action...\n");
+
+	srjson_doc_t jdoc;
+
+	srjson_InitDoc(&jdoc, NULL);
+
+	jdoc.root = srjson_CreateObject(&jdoc);
+	if(jdoc.root==NULL) {
+		LM_ERR("cannot create json root\n");
+		goto error;
+	}
+
+	srjson_AddNumberToObject(&jdoc, jdoc.root, "action", action);
+	srjson_AddStrToObject(&jdoc, jdoc.root, "htname", htname->s, htname->len);
+	if (cname!=NULL) {
+		srjson_AddStrToObject(&jdoc, jdoc.root, "cname", cname->s, cname->len);
+	}
+
+	if (action==HT_DMQ_SET_CELL || action==HT_DMQ_SET_CELL_EXPIRE || action==HT_DMQ_RM_CELL_RE) {
+		srjson_AddNumberToObject(&jdoc, jdoc.root, "type", type);
+		if (type&AVP_VAL_STR) {
+			srjson_AddStrToObject(&jdoc, jdoc.root, "strval", val->s.s, val->s.len);
+		} else {
+			srjson_AddNumberToObject(&jdoc, jdoc.root, "intval", val->n);
+		}
+	}
+
+	srjson_AddNumberToObject(&jdoc, jdoc.root, "mode", mode);	
+
+	jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root);
+	if(jdoc.buf.s!=NULL) {
+		jdoc.buf.len = strlen(jdoc.buf.s);
+		LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
+		if (ht_dmq_broadcast(&jdoc.buf)!=0) {
+			goto error;
+		}
+		jdoc.free_fn(jdoc.buf.s);
+		jdoc.buf.s = NULL;
+	} else {
+		LM_ERR("unable to serialize data\n");
+		goto error;
+	}
+
+	srjson_DestroyDoc(&jdoc);
+	return 0;
+
+error:
+	if(jdoc.buf.s!=NULL) {
+		jdoc.free_fn(jdoc.buf.s);
+		jdoc.buf.s = NULL;
+	}
+	srjson_DestroyDoc(&jdoc);
+	return -1;
+}
+
+int ht_dmq_replay_action(ht_dmq_action_t action, str* htname, str* cname, int type, int_str* val, int mode) {
+
+	LM_DBG("replaying action %d on %.*s=>%.*s...\n", action, htname->len, htname->s, cname->len, cname->s);
+
+	ht_t* ht;
+	ht = ht_get_table(htname);
+	if(ht==NULL) {
+		LM_ERR("unable to get table\n");
+		return -1;
+	}
+
+	if (action==HT_DMQ_SET_CELL) {
+		return ht_set_cell(ht, cname, type, val, mode);
+	} else if (action==HT_DMQ_SET_CELL_EXPIRE) {
+		return ht_set_cell_expire(ht, cname, 0, val);
+	} else if (action==HT_DMQ_DEL_CELL) {
+		return ht_del_cell(ht, cname);
+	} else if (action==HT_DMQ_RM_CELL_RE) {
+		return ht_rm_cell_re(&val->s, ht, mode);
+	} else {
+		LM_ERR("unrecognized action");
+		return -1;
+	}
+}
+
+/**
+ * @brief dmq response callback
+ */
+int ht_dmq_resp_callback_f(struct sip_msg* msg, int code,
+		dmq_node_t* node, void* param)
+{
+	LM_DBG("dmq response callback triggered [%p %d %p]\n", msg, code, param);
+	return 0;
+}
\ No newline at end of file
diff --git a/modules/htable/ht_dmq.h b/modules/htable/ht_dmq.h
new file mode 100644
index 0000000..045bd37
--- /dev/null
+++ b/modules/htable/ht_dmq.h
@@ -0,0 +1,49 @@
+/**
+ * $Id$
+ *
+ * Copyright (C) 2013 Charles Chance (Sipcentric Ltd)
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+#ifndef _HT_DMQ_H_
+#define _HT_DMQ_H_
+
+#include "../dmq/bind_dmq.h"
+#include "../../lib/srutils/srjson.h"
+#include "../../parser/msg_parser.h"
+#include "../../parser/parse_content.h"
+
+extern dmq_api_t ht_dmqb;
+extern dmq_peer_t* ht_dmq_peer;
+extern dmq_resp_cback_t ht_dmq_resp_callback;
+
+typedef enum {
+		HT_DMQ_NONE,
+        HT_DMQ_SET_CELL,
+        HT_DMQ_SET_CELL_EXPIRE,
+        HT_DMQ_DEL_CELL,
+        HT_DMQ_RM_CELL_RE
+} ht_dmq_action_t;
+
+int ht_dmq_initialize();
+int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp);
+int ht_dmq_replicate_action(ht_dmq_action_t action, str* htname, str* cname, int type, int_str* val, int mode);
+int ht_dmq_replay_action(ht_dmq_action_t action, str* htname, str* cname, int type, int_str* val, int mode);
+int ht_dmq_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param);
+
+#endif
diff --git a/modules/htable/ht_var.c b/modules/htable/ht_var.c
index 87f85c9..4ef38be 100644
--- a/modules/htable/ht_var.c
+++ b/modules/htable/ht_var.c
@@ -22,6 +22,7 @@
 		       
 #include "ht_api.h"
 #include "ht_var.h"
+#include "ht_dmq.h"
 
 /* pkg copy */
 ht_cell_t *_htc_local=NULL;
@@ -90,6 +91,9 @@ int pv_set_ht_cell(struct sip_msg* msg, pv_param_t *param,
 	if((val==NULL) || (val->flags&PV_VAL_NULL))
 	{
 		/* delete it */
+		if (hpv->ht->dmqreplicate>0 && ht_dmq_replicate_action(HT_DMQ_DEL_CELL, &hpv->htname, &htname, 0, NULL, 0)!=0) {
+			LM_ERR("dmq relication failed\n");
+		}
 		ht_del_cell(hpv->ht, &htname);
 		return 0;
 	}
@@ -97,6 +101,9 @@ int pv_set_ht_cell(struct sip_msg* msg, pv_param_t *param,
 	if(val->flags&PV_TYPE_INT)
 	{
 		isval.n = val->ri;
+		if (hpv->ht->dmqreplicate>0 && ht_dmq_replicate_action(HT_DMQ_SET_CELL, &hpv->htname, &htname, 0, &isval, 1)!=0) {
+			LM_ERR("dmq relication failed\n");
+		}
 		if(ht_set_cell(hpv->ht, &htname, 0, &isval, 1)!=0)
 		{
 			LM_ERR("cannot set $ht(%.*s)\n", htname.len, htname.s);
@@ -104,6 +111,9 @@ int pv_set_ht_cell(struct sip_msg* msg, pv_param_t *param,
 		}
 	} else {
 		isval.s = val->rs;
+		if (hpv->ht->dmqreplicate>0 && ht_dmq_replicate_action(HT_DMQ_SET_CELL, &hpv->htname, &htname, AVP_VAL_STR, &isval, 1)!=0) {
+			LM_ERR("dmq relication failed\n");
+		}
 		if(ht_set_cell(hpv->ht, &htname, AVP_VAL_STR, &isval, 1)!=0)
 		{
 			LM_ERR("cannot set $ht(%.*s)\n", htname.len, htname.s);
@@ -229,6 +239,9 @@ int pv_set_ht_cell_expire(struct sip_msg* msg, pv_param_t *param,
 		if(val->flags&PV_TYPE_INT)
 			isval.n = val->ri;
 	}
+	if (hpv->ht->dmqreplicate>0 && ht_dmq_replicate_action(HT_DMQ_SET_CELL_EXPIRE, &hpv->htname, &htname, 0, &isval, 0)!=0) {
+		LM_ERR("dmq relication failed\n");
+	}	
 	if(ht_set_cell_expire(hpv->ht, &htname, 0, &isval)!=0)
 	{
 		LM_ERR("cannot set $ht(%.*s)\n", htname.len, htname.s);
@@ -327,6 +340,11 @@ int pv_get_ht_add(struct sip_msg *msg,  pv_param_t *param,
 		return pv_get_null(msg, param, res);
 
 	/* integer */
+	if (hpv->ht->dmqreplicate>0) {
+		if (ht_dmq_replicate_action(HT_DMQ_SET_CELL, &hpv->htname, &htname, 0, &htc->value, 1)!=0) {
+			LM_ERR("dmq relication failed\n");
+		}
+	}	
 	return pv_get_sintval(msg, param, res, htc->value.n);
 }
 
diff --git a/modules/htable/htable.c b/modules/htable/htable.c
index 5dba9e0..1900ec5 100644
--- a/modules/htable/htable.c
+++ b/modules/htable/htable.c
@@ -44,12 +44,14 @@
 #include "ht_db.h"
 #include "ht_var.h"
 #include "api.h"
+#include "ht_dmq.h"
 
 
 MODULE_VERSION
 
 int  ht_timer_interval = 20;
 int  ht_db_expires_flag = 0;
+int  ht_enable_dmq = 0;
 
 static int htable_init_rpc(void);
 
@@ -124,6 +126,7 @@ static param_export_t params[]={
 	{"fetch_rows",         INT_PARAM, &ht_fetch_rows},
 	{"timer_interval",     INT_PARAM, &ht_timer_interval},
 	{"db_expires",         INT_PARAM, &ht_db_expires_flag},
+	{"enable_dmq",         INT_PARAM, &ht_enable_dmq},
 	{0,0,0}
 };
 
@@ -188,6 +191,12 @@ static int mod_init(void)
 			return -1;
 		}
 	}
+
+	if (ht_enable_dmq>0 && ht_dmq_initialize()!=0) {
+		LM_ERR("failed to initialize dmq integration\n");
+		return -1;
+	}
+
 	return 0;
 }
 
@@ -286,6 +295,7 @@ static int ht_rm_name_re(struct sip_msg* msg, char* key, char* foo)
 	str sre;
 	pv_spec_t *sp;
 	sp = (pv_spec_t*)key;
+	int_str isval;
 
 	hpv = (ht_pv_t*)sp->pvp.pvn.u.dname;
 
@@ -300,6 +310,12 @@ static int ht_rm_name_re(struct sip_msg* msg, char* key, char* foo)
 		LM_ERR("cannot get $ht expression\n");
 		return -1;
 	}
+	if (hpv->ht->dmqreplicate>0) {
+		isval.s = sre;
+		if (ht_dmq_replicate_action(HT_DMQ_RM_CELL_RE, &hpv->htname, NULL, AVP_VAL_STR, &isval, 0)!=0) {
+			LM_ERR("dmq relication failed\n");
+		}
+	}
 	if(ht_rm_cell_re(&sre, hpv->ht, 0)<0)
 		return -1;
 	return 1;
@@ -311,6 +327,7 @@ static int ht_rm_value_re(struct sip_msg* msg, char* key, char* foo)
 	str sre;
 	pv_spec_t *sp;
 	sp = (pv_spec_t*)key;
+	int_str isval;
 
 	hpv = (ht_pv_t*)sp->pvp.pvn.u.dname;
 
@@ -326,6 +343,12 @@ static int ht_rm_value_re(struct sip_msg* msg, char* key, char* foo)
 		return -1;
 	}
 
+	if (hpv->ht->dmqreplicate>0) {
+		isval.s = sre;
+		if (ht_dmq_replicate_action(HT_DMQ_RM_CELL_RE, &hpv->htname, NULL, AVP_VAL_STR, &isval, 1)!=0) {
+			LM_ERR("dmq relication failed\n");
+		}
+	}
 	if(ht_rm_cell_re(&sre, hpv->ht, 1)<0)
 		return -1;
 	return 1;
@@ -531,6 +554,10 @@ static struct mi_root* ht_mi_delete(struct mi_root* cmd_tree, void* param) {
 	if (!ht)
 		return init_mi_tree(404, MI_BAD_PARM_S, MI_BAD_PARM_LEN);
 
+	if (ht->dmqreplicate>0 && ht_dmq_replicate_action(HT_DMQ_DEL_CELL, &ht->name, key, 0, NULL, 0)!=0) {
+		LM_ERR("dmq relication failed\n");
+	}
+
 	ht_del_cell(ht, key);
 
 	return init_mi_tree(200, MI_OK_S, MI_OK_LEN);
@@ -655,6 +682,10 @@ static void htable_rpc_delete(rpc_t* rpc, void* c) {
 		return;
 	}
 
+	if (ht->dmqreplicate>0 && ht_dmq_replicate_action(HT_DMQ_DEL_CELL, &ht->name, &keyname, 0, NULL, 0)!=0) {
+		LM_ERR("dmq relication failed\n");
+	}
+
 	ht_del_cell(ht, &keyname);
 }
 
@@ -737,6 +768,10 @@ static void htable_rpc_sets(rpc_t* rpc, void* c) {
 		return;
 	}
 	
+	if (ht->dmqreplicate>0 && ht_dmq_replicate_action(HT_DMQ_SET_CELL, &ht->name, &keyname, AVP_VAL_STR, &keyvalue, 1)!=0) {
+		LM_ERR("dmq relication failed\n");
+	}
+
 	if(ht_set_cell(ht, &keyname, AVP_VAL_STR, &keyvalue, 1)!=0)
 	{
 		LM_ERR("cannot set $ht(%.*s=>%.*s)\n", htname.len, htname.s,
@@ -766,6 +801,10 @@ static void htable_rpc_seti(rpc_t* rpc, void* c) {
 		rpc->fault(c, 500, "No such htable");
 		return;
 	}
+
+	if (ht->dmqreplicate>0 && ht_dmq_replicate_action(HT_DMQ_SET_CELL, &ht->name, &keyname, 0, &keyvalue, 1)!=0) {
+		LM_ERR("dmq relication failed\n");
+	}
 	
 	if(ht_set_cell(ht, &keyname, 0, &keyvalue, 1)!=0)
 	{
@@ -886,13 +925,14 @@ static void  htable_rpc_list(rpc_t* rpc, void* c)
 			dbname[0] = '\0';
 		}
 
-		if(rpc->struct_add(th, "Ssdddd",
+		if(rpc->struct_add(th, "Ssddddd",
 						"name", &ht->name,	/* String */
 						"dbtable", &dbname ,	/* Char * */
 						"dbmode", (int)  ht->dbmode,		/* u int */
 						"expire", (int) ht->htexpire,		/* u int */
 						"updateexpire", ht->updateexpire,	/* int */
-						"size", (int) ht->htsize		/* u int */
+						"size", (int) ht->htsize,			/* u int */
+						"dmqreplicate", ht->dmqreplicate	/* int */
 						) < 0) {
 			rpc->fault(c, 500, "Internal error creating data rpc");
 			goto error;
