Sometimes when a remote node restarts abruptly, queues which were created on
that node, are unable to be opened again when that node comes up.

There is a race condition when the remote node goes down between msgd getting
the CLM and MDS events indicating node down, and immd removing the implementer
for that remote node. When msgd gets the CLM and MDS events indicating node
down it temporarily becomes the implementer for any queues on that node so that
it can remove the entries in IMM. If IMM has not yet removed the implementer,
msgd will fail to remove the IMM entries. When the remote node comes back up,
and the queues are opened, they will fail because the IMM entries are still
there.

When msgd recevies ERR_EXIST from implementer set in this case, it should
treat it as TRY_AGAIN.
---
 src/msg/msgd/mqd_clm.c  | 60 +++++++++++++++++++++++++++++++++++++------------
 src/msg/msgd/mqd_db.h   |  2 +-
 src/msg/msgd/mqd_evt.c  | 12 ++++++++--
 src/msg/msgd/mqd_util.c |  2 +-
 4 files changed, 58 insertions(+), 18 deletions(-)

diff --git a/src/msg/msgd/mqd_clm.c b/src/msg/msgd/mqd_clm.c
index 912d5a3f5..41d9bcf15 100644
--- a/src/msg/msgd/mqd_clm.c
+++ b/src/msg/msgd/mqd_clm.c
@@ -26,6 +26,7 @@
 the cluster track
 
*************************************************************************************************/
 
+#include "base/osaf_time.h"
 #include "msg/msgd/mqd.h"
 #include "mqd_imm.h"
 extern MQDLIB_INFO gl_mqdinfo;
@@ -56,11 +57,10 @@ void mqd_clm_cluster_track_callback(
        } else {
                for (counter = 0; counter < notificationBuffer->numberOfItems;
                     counter++) {
+                       node_id = notificationBuffer->notification[counter].
+                               clusterNode.nodeId;
                        if (notificationBuffer->notification[counter]
                                .clusterChange == SA_CLM_NODE_LEFT) {
-                               node_id =
-                                   notificationBuffer->notification[counter]
-                                       .clusterNode.nodeId;
                                pNdNode =
                                    (MQD_ND_DB_NODE *)ncs_patricia_tree_get(
                                        &pMqd->node_db, (uint8_t *)&node_id);
@@ -78,6 +78,8 @@ void mqd_clm_cluster_track_callback(
                                                    true;
                                        }
                                } else {
+                                       SaTimeT timeout =
+            m_NCS_CONVERT_SATIME_TO_TEN_MILLI_SEC(MQD_ND_EXPIRY_TIME_STANDBY);
                                        TRACE_2(
                                            "%s:%u: CLM Event is coming first 
for Node down",
                                            __FILE__, __LINE__);
@@ -93,9 +95,22 @@ void mqd_clm_cluster_track_callback(
                                        pNdNode->info.nodeid = node_id;
                                        pNdNode->info.is_clm_down = true;
                                        mqd_red_db_node_add(pMqd, pNdNode);
-                                       if (pMqd->ha_state == SA_AMF_HA_ACTIVE)
-                                               mqd_del_node_down_info(pMqd,
-                                                                      node_id);
+                                       mqd_tmr_start(&pNdNode->info.timer,
+                                                       timeout);
+                               }
+                       } else if (notificationBuffer->notification[counter].
+                                       clusterChange == SA_CLM_NODE_JOINED) {
+                               pNdNode =
+                                   (MQD_ND_DB_NODE *)ncs_patricia_tree_get(
+                                       &pMqd->node_db, (uint8_t *)&node_id);
+                               if (pNdNode) {
+                                       mqd_tmr_stop(&pNdNode->info.timer);
+
+                                       if (pMqd->ha_state ==
+                                               SA_AMF_HA_ACTIVE) {
+                                               mqd_red_db_node_del(pMqd,
+                                                               pNdNode);
+                                       }
                                }
                        }
                }
@@ -111,21 +126,38 @@ void mqd_del_node_down_info(MQD_CB *pMqd, NODE_ID nodeid)
        SaImmOiHandleT immOiHandle;
        SaAisErrorT rc = SA_AIS_OK;
        SaImmOiImplementerNameT implementer_name;
+       int retries = 5;
        char i_name[256] = {0};
        SaVersionT imm_version = {'A', 0x02, 0x01};
        TRACE_ENTER2("nodeid=%u", nodeid);
 
        rc = immutil_saImmOiInitialize_2(&immOiHandle, NULL, &imm_version);
        if (rc != SA_AIS_OK)
-               TRACE_4("saImmOiInitialize_2 failed with return value=%d", rc);
+               LOG_ER("saImmOiInitialize_2 failed with return value=%d", rc);
 
        snprintf(i_name, SA_MAX_NAME_LENGTH, "%s%u", "MsgQueueService", nodeid);
        implementer_name = i_name;
 
-       rc = immutil_saImmOiImplementerSet(immOiHandle, implementer_name);
-       if (rc != SA_AIS_OK)
-               TRACE_4("saImmOiImplementerSet failed with return value=%d",
-                       rc);
+       while (retries--) {
+               rc = immutil_saImmOiImplementerSet(immOiHandle,
+                                                       implementer_name);
+               if (rc == SA_AIS_OK)
+                       break;
+               else if (rc == SA_AIS_ERR_EXIST) {
+                       /*
+                        * imm has not yet removed implementer for remote node.
+                        * try again.
+                        */
+                       osaf_nanosleep(&kOneSecond);
+                       continue;
+               }
+               else {
+                       LOG_ER("saImmOiImplementerSet failed with return value="
+                               "%d",
+                               rc);
+                       break;
+               }
+       }
 
        pNode =
            (MQD_OBJ_NODE *)ncs_patricia_tree_getnext(&pMqd->qdb, (uint8_t *)0);
@@ -142,19 +174,19 @@ void mqd_del_node_down_info(MQD_CB *pMqd, NODE_ID nodeid)
                        rc = immutil_saImmOiRtObjectDelete(immOiHandle,
                                                           &dereg.queue);
                        if (rc != SA_AIS_OK)
-                               TRACE_4(
+                               LOG_ER(
                                    "Deleting MsgQGrp object %s FAILED with 
return value=%d",
                                    dereg.queue.value, rc);
                        if (mqd_asapi_dereg_hdlr(pMqd, &dereg, NULL) !=
                            NCSCC_RC_SUCCESS)
-                               TRACE_4("mqd_asapi_dereg_hdlr failed");
+                               LOG_ER("mqd_asapi_dereg_hdlr failed");
                }
                pNode = (MQD_OBJ_NODE *)ncs_patricia_tree_getnext(
                    &pMqd->qdb, (uint8_t *)&name);
        }
        rc = immutil_saImmOiFinalize(immOiHandle);
        if (rc != SA_AIS_OK)
-               TRACE_4("saImmOiFinalize failed with return value=%d", rc);
+               LOG_ER("saImmOiFinalize failed with return value=%d", rc);
 
        /* Send an async Update to the standby */
        memset(&msg, 0, sizeof(MQD_A2S_MSG));
diff --git a/src/msg/msgd/mqd_db.h b/src/msg/msgd/mqd_db.h
index 3655f0e48..82459fb60 100644
--- a/src/msg/msgd/mqd_db.h
+++ b/src/msg/msgd/mqd_db.h
@@ -190,6 +190,6 @@ uint32_t mqd_red_db_node_add(MQD_CB *pMqd, MQD_ND_DB_NODE 
*pNode);
 uint32_t mqd_red_db_node_create(MQD_CB *pMqd, MQD_ND_DB_NODE **o_pnode);
 void mqd_red_db_node_del(MQD_CB *pMqd, MQD_ND_DB_NODE *pNode);
 void mqd_qparam_upd(MQD_OBJ_NODE *, ASAPi_QUEUE_PARAM *);
-void mqd_qparam_fill(MQD_QUEUE_PARAM *, ASAPi_QUEUE_PARAM *);
+void mqd_qparam_fill(const MQD_QUEUE_PARAM *, ASAPi_QUEUE_PARAM *);
 
 #endif  // MSG_MSGD_MQD_DB_H_
diff --git a/src/msg/msgd/mqd_evt.c b/src/msg/msgd/mqd_evt.c
index ccd08d70a..315cd689e 100644
--- a/src/msg/msgd/mqd_evt.c
+++ b/src/msg/msgd/mqd_evt.c
@@ -381,6 +381,13 @@ static uint32_t mqd_nd_status_evt_process(MQD_CB *pMqd,
                        mqd_tmr_start(&pNdNode->info.timer, timeout);
                } else {
                        TRACE_2("Deleting the nd node from MQD");
+                       mqd_tmr_stop(&pNdNode->info.timer);
+
+                       if (pMqd->ha_state == SA_AMF_HA_ACTIVE) {
+                               /* if remote node is totally down then clean up 
*/
+                               if (pNdNode->info.is_clm_down)
+                                       mqd_del_node_down_info(pMqd, node_id);
+                       }
                        mqd_red_db_node_del(pMqd, pNdNode);
                }
 
@@ -397,8 +404,6 @@ static uint32_t mqd_nd_status_evt_process(MQD_CB *pMqd,
                        pNdNode->info.dest = nd_info->dest;
                        if (pMqd->ha_state == SA_AMF_HA_ACTIVE) {
                                mqd_red_db_node_del(pMqd, pNdNode);
-                               mqd_nd_restart_update_dest_info(pMqd,
-                                                               nd_info->dest);
                                /* Send an async update event to standby MQD */
                                memset(&msg, 0, sizeof(MQD_A2S_MSG));
                                msg.type = MQD_A2S_MSG_TYPE_MQND_STATEVT;
@@ -413,6 +418,9 @@ static uint32_t mqd_nd_status_evt_process(MQD_CB *pMqd,
                                    (void *)(&msg.info.nd_stat_evt));
                        }
                }
+
+               if (pMqd->ha_state == SA_AMF_HA_ACTIVE)
+                       mqd_nd_restart_update_dest_info(pMqd, nd_info->dest);
                TRACE_1("MDS UP PROCESSED ON %d DONE", pMqd->ha_state);
        }
        TRACE_LEAVE();
diff --git a/src/msg/msgd/mqd_util.c b/src/msg/msgd/mqd_util.c
index fa42b150c..dee5863a3 100644
--- a/src/msg/msgd/mqd_util.c
+++ b/src/msg/msgd/mqd_util.c
@@ -161,7 +161,7 @@ void mqd_qparam_upd(MQD_OBJ_NODE *pNode, ASAPi_QUEUE_PARAM 
*qparam)
 
    RETURNS        :  none
 \****************************************************************************/
-void mqd_qparam_fill(MQD_QUEUE_PARAM *pParam, ASAPi_QUEUE_PARAM *pQparam)
+void mqd_qparam_fill(const MQD_QUEUE_PARAM *pParam, ASAPi_QUEUE_PARAM *pQparam)
 {
        /* Fill the Queue params */
        pQparam->retentionTime = pParam->retentionTime;
-- 
2.13.6


------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, Slashdot.org! http://sdm.link/slashdot
_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to