If multiple nodes go down simultaneously which are hosting msg queues (e.g. multiple VMs on a host, and the host goes down), msgd can take a long time to process the node downs which blocks the main thread, and therefore the healthcheck doesn't get processed, so msgd dies, which restarts the controller.
msgd needs to sit in a loop waiting for imm to release the implementers for each of the down nodes. For many nodes which went down simultaneously this can take up to 20 seconds when done serially. Node down logic needs to be put on a thread, so that we can continue to process other messages like healthcheck. This also allows us to parallelize the node down handling. --- src/msg/msgd/mqd_asapi.c | 17 +++-- src/msg/msgd/mqd_clm.c | 183 +++++++++++++++++++++++++++++++---------------- src/msg/msgd/mqd_evt.c | 5 ++ src/msg/msgd/mqd_mbcsv.c | 16 +++-- src/msg/msgd/mqd_ntf.cc | 4 ++ 5 files changed, 151 insertions(+), 74 deletions(-) diff --git a/src/msg/msgd/mqd_asapi.c b/src/msg/msgd/mqd_asapi.c index c44df4d2a..eb760ca8e 100644 --- a/src/msg/msgd/mqd_asapi.c +++ b/src/msg/msgd/mqd_asapi.c @@ -1298,18 +1298,18 @@ static uint32_t mqd_asapi_queue_make(MQD_OBJ_INFO *pObjInfo, "%s:%u:ERR_MEMORY:Failed To Allocate Memory for QGroups", __FILE__, __LINE__); return SA_AIS_ERR_NO_MEMORY; - return SA_AIS_ERR_NO_MEMORY; } itr.state = 0; - for (idx = 0; idx < qcnt; idx++) { - pOelm = (MQD_OBJECT_ELEM *)ncs_walk_items( - &pObjInfo->ilist, &itr); + idx = 0; + while ((pOelm = (MQD_OBJECT_ELEM *)ncs_walk_items( + &pObjInfo->ilist, &itr))) { memcpy(&pQueue[idx].name, &pOelm->pObject->name, sizeof(SaNameT)); mqd_qparam_fill(&pOelm->pObject->info.q, &pQueue[idx]); + idx++; } } } else { @@ -1632,6 +1632,8 @@ void mqd_nd_restart_update_dest_info(MQD_CB *pMqd, MDS_DEST dest) NCS_Q_ITR itr; uint32_t count = 0; + m_NCS_LOCK(&pMqd->mqd_cb_lock, NCS_LOCK_WRITE); + pObjNode = (MQD_OBJ_NODE *)ncs_patricia_tree_getnext(&pMqd->qdb, (uint8_t *)NULL); while (pObjNode) { @@ -1686,6 +1688,8 @@ void mqd_nd_restart_update_dest_info(MQD_CB *pMqd, MDS_DEST dest) pObjNode = (MQD_OBJ_NODE *)ncs_patricia_tree_getnext( &pMqd->qdb, (uint8_t *)&name); } + + m_NCS_UNLOCK(&pMqd->mqd_cb_lock, NCS_LOCK_WRITE); } /****************************************************************************\ @@ -1707,6 +1711,8 @@ void mqd_nd_down_update_info(MQD_CB *pMqd, MDS_DEST dest) NCS_Q_ITR itr; uint32_t count = 0; + m_NCS_LOCK(&pMqd->mqd_cb_lock, NCS_LOCK_WRITE); + pObjNode = (MQD_OBJ_NODE *)ncs_patricia_tree_getnext(&pMqd->qdb, (uint8_t *)NULL); while (pObjNode) { @@ -1757,6 +1763,9 @@ void mqd_nd_down_update_info(MQD_CB *pMqd, MDS_DEST dest) pObjNode = (MQD_OBJ_NODE *)ncs_patricia_tree_getnext( &pMqd->qdb, (uint8_t *)&name); } + + m_NCS_UNLOCK(&pMqd->mqd_cb_lock, NCS_LOCK_WRITE); + return; } diff --git a/src/msg/msgd/mqd_clm.c b/src/msg/msgd/mqd_clm.c index 41d9bcf15..0dbb21b23 100644 --- a/src/msg/msgd/mqd_clm.c +++ b/src/msg/msgd/mqd_clm.c @@ -119,84 +119,141 @@ void mqd_clm_cluster_track_callback( TRACE_LEAVE(); } -void mqd_del_node_down_info(MQD_CB *pMqd, NODE_ID nodeid) +static void * _mqd_del_node_down_info(void *arg) { - MQD_OBJ_NODE *pNode = 0; - MQD_A2S_MSG msg; - SaImmOiHandleT immOiHandle; + NODE_ID nodeid = *(NODE_ID *) arg; SaAisErrorT rc = SA_AIS_OK; - SaImmOiImplementerNameT implementer_name; - int retries = 5; - char i_name[256] = {0}; - SaVersionT imm_version = {'A', 0x02, 0x01}; + SaImmOiHandleT immOiHandle = 0; + MQD_CB *pMqd = ncshm_take_hdl(NCS_SERVICE_ID_MQD, gl_mqdinfo.inst_hdl); + TRACE_ENTER2("nodeid=%u", nodeid); - rc = immutil_saImmOiInitialize_2(&immOiHandle, NULL, &imm_version); - if (rc != SA_AIS_OK) - LOG_ER("saImmOiInitialize_2 failed with return value=%d", rc); + free(arg); - snprintf(i_name, SA_MAX_NAME_LENGTH, "%s%u", "MsgQueueService", nodeid); - implementer_name = i_name; + do { + MQD_OBJ_NODE *pNode = 0; + MQD_A2S_MSG msg; + SaImmOiImplementerNameT implementer_name; + int retries = 5; + char i_name[256] = {0}; + SaVersionT imm_version = {'A', 0x02, 0x01}; - while (retries--) { - rc = immutil_saImmOiImplementerSet(immOiHandle, - implementer_name); - if (rc == SA_AIS_OK) + if (!pMqd) { + LOG_ER("%s:%u: Instance Doesn't Exist", __FILE__, + __LINE__); break; - else if (rc == SA_AIS_ERR_EXIST) { - /* - * imm has not yet removed implementer for remote node. - * try again. - */ - osaf_nanosleep(&kOneSecond); - continue; } - else { - LOG_ER("saImmOiImplementerSet failed with return value=" - "%d", - rc); + + rc = immutil_saImmOiInitialize_2(&immOiHandle, NULL, + &imm_version); + if (rc != SA_AIS_OK) { + LOG_ER("saImmOiInitialize_2 failed with return value=" + "%d", rc); break; } - } - pNode = - (MQD_OBJ_NODE *)ncs_patricia_tree_getnext(&pMqd->qdb, (uint8_t *)0); - while (pNode) { - SaNameT name; - name = pNode->oinfo.name; - if (m_NCS_NODE_ID_FROM_MDS_DEST(pNode->oinfo.info.q.dest) == - nodeid) { - ASAPi_DEREG_INFO dereg; - memset(&dereg, 0, sizeof(ASAPi_DEREG_INFO)); - dereg.objtype = ASAPi_OBJ_QUEUE; - dereg.queue = pNode->oinfo.name; - - rc = immutil_saImmOiRtObjectDelete(immOiHandle, - &dereg.queue); - if (rc != SA_AIS_OK) - LOG_ER( - "Deleting MsgQGrp object %s FAILED with return value=%d", - dereg.queue.value, rc); - if (mqd_asapi_dereg_hdlr(pMqd, &dereg, NULL) != - NCSCC_RC_SUCCESS) - LOG_ER("mqd_asapi_dereg_hdlr failed"); + snprintf(i_name, SA_MAX_NAME_LENGTH, "%s%u", "MsgQueueService", + nodeid); + implementer_name = i_name; + + while (retries--) { + rc = immutil_saImmOiImplementerSet(immOiHandle, + implementer_name); + if (rc == SA_AIS_OK) + break; + else if (rc == SA_AIS_ERR_EXIST) { + /* + * imm has not yet removed implementer for + * remote node. try again. + */ + osaf_nanosleep(&kOneSecond); + continue; + } + else { + LOG_ER("saImmOiImplementerSet failed with " + "return value=%d", rc); + break; + } + } + + if (rc != SA_AIS_OK) { + LOG_ER("immutil_saImmOiImplementerSet failed: %i", rc); + break; + } + + m_NCS_LOCK(&pMqd->mqd_cb_lock, NCS_LOCK_WRITE); + + pNode = (MQD_OBJ_NODE *)ncs_patricia_tree_getnext(&pMqd->qdb, + (uint8_t *)0); + while (pNode) { + SaNameT name; + name = pNode->oinfo.name; + if (m_NCS_NODE_ID_FROM_MDS_DEST( + pNode->oinfo.info.q.dest) == + nodeid) { + ASAPi_DEREG_INFO dereg; + memset(&dereg, 0, sizeof(ASAPi_DEREG_INFO)); + dereg.objtype = ASAPi_OBJ_QUEUE; + dereg.queue = pNode->oinfo.name; + + rc = immutil_saImmOiRtObjectDelete(immOiHandle, + &dereg.queue); + if (rc != SA_AIS_OK) + LOG_ER("Deleting MsgQGrp object %s " + "FAILED with return " + "value=%d", + dereg.queue.value, rc); + if (mqd_asapi_dereg_hdlr(pMqd, &dereg, NULL) != + NCSCC_RC_SUCCESS) + LOG_ER("mqd_asapi_dereg_hdlr failed"); + } + pNode = (MQD_OBJ_NODE *)ncs_patricia_tree_getnext( + &pMqd->qdb, (uint8_t *)&name); } - pNode = (MQD_OBJ_NODE *)ncs_patricia_tree_getnext( - &pMqd->qdb, (uint8_t *)&name); + + m_NCS_UNLOCK(&pMqd->mqd_cb_lock, NCS_LOCK_WRITE); + + /* Send an async Update to the standby */ + memset(&msg, 0, sizeof(MQD_A2S_MSG)); + msg.type = MQD_A2S_MSG_TYPE_MQND_TIMER_EXPEVT; + msg.info.nd_tmr_exp_evt.nodeid = nodeid; + + /* Send async update to the standby for MQD redundancy */ + mqd_a2s_async_update(pMqd, MQD_A2S_MSG_TYPE_MQND_TIMER_EXPEVT, + (void *)(&msg.info.nd_tmr_exp_evt)); + + } while (false); + + if (immOiHandle) { + rc = immutil_saImmOiFinalize(immOiHandle); + if (rc != SA_AIS_OK) + LOG_ER("saImmOiFinalize failed with return value=%d", + rc); } - rc = immutil_saImmOiFinalize(immOiHandle); - if (rc != SA_AIS_OK) - LOG_ER("saImmOiFinalize failed with return value=%d", rc); - /* Send an async Update to the standby */ - memset(&msg, 0, sizeof(MQD_A2S_MSG)); - msg.type = MQD_A2S_MSG_TYPE_MQND_TIMER_EXPEVT; - msg.info.nd_tmr_exp_evt.nodeid = nodeid; + if (pMqd) + ncshm_give_hdl(pMqd->hdl); - /* Send async update to the standby for MQD redundancy */ - mqd_a2s_async_update(pMqd, MQD_A2S_MSG_TYPE_MQND_TIMER_EXPEVT, - (void *)(&msg.info.nd_tmr_exp_evt)); + TRACE_LEAVE(); + return 0; +} + +void mqd_del_node_down_info(MQD_CB *pMqd, NODE_ID nodeid) +{ + pthread_t thread; + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + TRACE_ENTER(); + + NODE_ID *arg = malloc(sizeof(NODE_ID)); + *arg = nodeid; + + if (pthread_create(&thread, &attr, _mqd_del_node_down_info, arg) != 0) { + LOG_CR("pthread_create FAILED: %s", strerror(errno)); + exit(EXIT_FAILURE); + } + pthread_attr_destroy(&attr); TRACE_LEAVE(); - return; } diff --git a/src/msg/msgd/mqd_evt.c b/src/msg/msgd/mqd_evt.c index 315cd689e..753b0a750 100644 --- a/src/msg/msgd/mqd_evt.c +++ b/src/msg/msgd/mqd_evt.c @@ -282,6 +282,8 @@ uint32_t mqd_timer_expiry_evt_process(MQD_CB *pMqd, NODE_ID *nodeid) pNdNode->info.timer.tmr_id = TMR_T_NULL; } + m_NCS_LOCK(&pMqd->mqd_cb_lock, NCS_LOCK_WRITE); + pNode = (MQD_OBJ_NODE *)ncs_patricia_tree_getnext(&pMqd->qdb, (uint8_t *)0); while (pNode) { @@ -309,6 +311,9 @@ uint32_t mqd_timer_expiry_evt_process(MQD_CB *pMqd, NODE_ID *nodeid) pNode = (MQD_OBJ_NODE *)ncs_patricia_tree_getnext( &pMqd->qdb, (uint8_t *)&name); } + + m_NCS_UNLOCK(&pMqd->mqd_cb_lock, NCS_LOCK_WRITE); + rc = immutil_saImmOiFinalize(immOiHandle); if (rc != NCSCC_RC_SUCCESS) LOG_ER("saImmOiFinalize failed with return value=%d", diff --git a/src/msg/msgd/mqd_mbcsv.c b/src/msg/msgd/mqd_mbcsv.c index 5b0de15c8..60bfd1fc6 100644 --- a/src/msg/msgd/mqd_mbcsv.c +++ b/src/msg/msgd/mqd_mbcsv.c @@ -1249,20 +1249,22 @@ mqd_copy_data_to_cold_sync_structure(MQD_OBJ_INFO *obj_info, mbcsv_info->track_cnt * sizeof(MQD_A2S_TRACK_INFO)); } itr.state = 0; + index = 0; - for (index = 0; index < mbcsv_info->ilist_cnt; index++) { - - pOelm = - (MQD_OBJECT_ELEM *)ncs_walk_items(&obj_info->ilist, &itr); + while ((pOelm = (MQD_OBJECT_ELEM *)ncs_walk_items(&obj_info->ilist, + &itr))) { mbcsv_info->ilist_info[index] = pOelm->pObject->name; + index++; } + itr.state = 0; - for (index = 0; index < mbcsv_info->track_cnt; index++) { + index = 0; - pTrkObj = - (MQD_TRACK_OBJ *)ncs_walk_items(&obj_info->tlist, &itr); + while ((pTrkObj = (MQD_TRACK_OBJ *)ncs_walk_items(&obj_info->tlist, + &itr))) { mbcsv_info->track_info[index].dest = pTrkObj->dest; mbcsv_info->track_info[index].to_svc = pTrkObj->to_svc; + index++; } TRACE_LEAVE(); return NCSCC_RC_SUCCESS; diff --git a/src/msg/msgd/mqd_ntf.cc b/src/msg/msgd/mqd_ntf.cc index ab2919313..041aa60f9 100644 --- a/src/msg/msgd/mqd_ntf.cc +++ b/src/msg/msgd/mqd_ntf.cc @@ -120,6 +120,8 @@ static void updateQueueGroup(MQD_CB *cb, typedef std::vector<SaNameT> GroupList; GroupList groupList; + m_NCS_LOCK(&cb->mqd_cb_lock, NCS_LOCK_WRITE); + // maybe there is a better way to do this? for (MQD_OBJ_NODE *objNode(reinterpret_cast<MQD_OBJ_NODE *> (ncs_patricia_tree_getnext(&cb->qdb, 0))); @@ -156,6 +158,8 @@ static void updateQueueGroup(MQD_CB *cb, } } + m_NCS_UNLOCK(&cb->mqd_cb_lock, NCS_LOCK_WRITE); + if (member && ((status == SA_MSG_QUEUE_CAPACITY_REACHED && allFull) || (status == SA_MSG_QUEUE_CAPACITY_AVAILABLE && allFull))) { for (auto& it : groupList) -- 2.13.6 ------------------------------------------------------------------------------ Check out the vibrant tech community on one of the world's most engaging tech sites, Slashdot.org! http://sdm.link/slashdot _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel