Hi Alex,
I agree, but should we not skip running this test, in case if we are executing it on active controller? Thank you Srinivas From: Alex Jones [mailto:[email protected]] Sent: Wednesday, November 8, 2017 6:44 PM To: Srinivas Mangipudy <[email protected]> Cc: [email protected] Subject: Re: [PATCH 1/1] msg: add support for critical capacity thresholds [#2625] Hi Srinivas, You need to run that test on a node that is not the active controller. Alex _____ From: Srinivas Mangipudy <HYPERLINK "mailto:[email protected]"[email protected]> Sent: Wednesday, November 8, 2017 4:19:14 AM To: Alex Jones Cc: HYPERLINK "mailto:[email protected]"[email protected] Subject: RE: [PATCH 1/1] msg: add support for critical capacity thresholds [#2625] _____ NOTICE: This email was received from an EXTERNAL sender _____ Hi Alex, I executed "msgtest" and it failed with the below error: Suite 26: SA_AIS_ERR_UNAVAILABLE Test Suite error - saImmOmAdminOperationInvoke_2 admin-op RETURNED: SA_AIS_ERR_NOT_SUPPORTED (19) The process tried to execute the command "immadm -o 2 safNode=SC-2,safCluster=myClmCluster" and it returned SA_AIS_ERR_NOT_SUPPORTED error. Logs had below info: Nov 8 14:42:45 osaf-VirtualBox osafclmd[3943]: NO Lock on active node not allowed Nov 8 14:42:45 osaf-VirtualBox osafclmd[3943]: NO clms_imm_node_lock failed Nov 8 14:43:20 osaf-VirtualBox osafclmd[3943]: NO Lock on active node not allowed Nov 8 14:43:20 osaf-VirtualBox osafclmd[3943]: NO clms_imm_node_lock failed Thank you Srinivas -----Original Message----- From: Alex Jones [mailto:[email protected]] Sent: Tuesday, November 7, 2017 11:45 PM To: Srinivas Mangipudy <HYPERLINK "mailto:[email protected]"[email protected]> Cc: HYPERLINK "mailto:[email protected]"[email protected]; Alex Jones <HYPERLINK "mailto:[email protected]"[email protected]> Subject: [PATCH 1/1] msg: add support for critical capacity thresholds [#2625] This commit implements Sections 3.4.12 and 6.2.2 of the B.03.01 MSG spec. --- src/msg/Makefile.am | 8 +- src/msg/agent/mqa_api.c | 330 +++++++++ src/msg/apitest/test_CapacityThresholds.cc | 1049 ++++++++++++++++++++++++++++ src/msg/apitest/test_ErrUnavailable.cc | 110 ++- src/msg/common/mqsv_edu.c | 74 +- src/msg/common/mqsv_evt.h | 15 +- src/msg/msgd/mqd_api.c | 64 +- src/msg/msgd/mqd_db.h | 4 + src/msg/msgd/mqd_ntf.cc | 310 ++++++++ src/msg/msgd/mqd_ntf.h | 49 ++ src/msg/msgnd/mqnd_db.h | 6 + src/msg/msgnd/mqnd_evt.c | 359 +++++++++- src/msg/msgnd/mqnd_imm.c | 5 +- src/msg/msgnd/mqnd_util.c | 4 + 14 files changed, 2374 insertions(+), 13 deletions(-) create mode 100644 src/msg/apitest/test_CapacityThresholds.cc create mode 100644 src/msg/msgd/mqd_ntf.cc create mode 100644 src/msg/msgd/mqd_ntf.h diff --git a/src/msg/Makefile.am b/src/msg/Makefile.am index aeec6ca..c308072 100644 --- a/src/msg/Makefile.am +++ b/src/msg/Makefile.am @@ -97,6 +97,7 @@ noinst_HEADERS += \ src/msg/msgd/mqd_dl_api.h \ src/msg/msgd/mqd_imm.h \ src/msg/msgd/mqd_mem.h \ + src/msg/msgd/mqd_ntf.h \ src/msg/msgd/mqd_red.h \ src/msg/msgd/mqd_saf.h \ src/msg/msgd/mqd_tmr.h \ @@ -158,6 +159,7 @@ bin_osafmsgnd_LDADD = \ lib/libmsg_common.la \ lib/libSaAmf.la \ lib/libSaClm.la \ + lib/libSaNtf.la \ lib/libosaf_common.la \ lib/libSaImmOi.la \ lib/libSaImmOm.la \ @@ -182,7 +184,8 @@ bin_osafmsgd_SOURCES = \ src/msg/msgd/mqd_mds.c \ src/msg/msgd/mqd_red.c \ src/msg/msgd/mqd_saf.c \ - src/msg/msgd/mqd_tmr.c + src/msg/msgd/mqd_tmr.c \ + src/msg/msgd/mqd_ntf.cc bin_osafmsgd_LDADD = \ lib/libmsg_common.la \ @@ -191,6 +194,7 @@ bin_osafmsgd_LDADD = \ lib/libosaf_common.la \ lib/libSaImmOi.la \ lib/libSaImmOm.la \ + lib/libSaNtf.la \ lib/libopensaf_core.la if ENABLE_TESTS @@ -205,6 +209,7 @@ noinst_HEADERS += \ bin_msgtest_SOURCES = \ src/msg/apitest/msgtest.c \ src/msg/apitest/test_saMsgVersionT.cc \ + src/msg/apitest/test_CapacityThresholds.cc \ src/msg/apitest/test_ErrUnavailable.cc \ src/msg/apitest/tet_mqa_conf.c \ src/msg/apitest/tet_mqsv_util.c \ @@ -212,6 +217,7 @@ bin_msgtest_SOURCES = \ bin_msgtest_LDADD = \ lib/libSaMsg.la \ + lib/libSaNtf.la \ lib/libopensaf_core.la \ lib/libapitest.la diff --git a/src/msg/agent/mqa_api.c b/src/msg/agent/mqa_api.c index eeec00e..0070aaf 100644 --- a/src/msg/agent/mqa_api.c +++ b/src/msg/agent/mqa_api.c @@ -5600,6 +5600,336 @@ done: return rc; } +/**************************************************************************** + Name : saMsgQueueCapacityThresholdsSet + + Description : This routine sets the critical capacity thresholds for the + queue. + + Arguments : SaMsgQueueHandleT queueHandle + : const SaMsgQueueThresholdsT *thresholds + + Return Values : SaAisErrorT + + Notes : None +******************************************************************************/ +SaAisErrorT saMsgQueueCapacityThresholdsSet(SaMsgQueueHandleT queueHandle, + const SaMsgQueueThresholdsT *thresholds) +{ + SaAisErrorT rc = SA_AIS_OK; + MQA_CB *mqa_cb; + bool locked = false; + + do { + MQSV_EVT cap_evt; + MQSV_EVT *out_evt = 0; + MQA_QUEUE_INFO *queue_node; + uint8_t mds_rc; + int i; + + TRACE_ENTER2("SaMsgQueueHandle %llu ", queueHandle); + + /* retrieve MQA CB */ + mqa_cb = (MQA_CB *)m_MQSV_MQA_RETRIEVE_MQA_CB; + if (!mqa_cb) { + TRACE_2("ERR_BAD_HANDLE: Control block retrieval " + "failed"); + rc = SA_AIS_ERR_BAD_HANDLE; + break; + } + + if (m_NCS_LOCK(&mqa_cb->cb_lock, NCS_LOCK_WRITE) != + NCSCC_RC_SUCCESS) { + TRACE_4("ERR_LIBRARY: Lock failed for control block " + "write"); + rc = SA_AIS_ERR_LIBRARY; + break; + } + + locked = true; + + /* Check if queueHandle is present in the tree */ + if ((queue_node = mqa_queue_tree_find_and_add( + mqa_cb, queueHandle, false, NULL, 0)) == NULL) { + TRACE_2("ERR_BAD_HANDLE: Queue Database Find Failed"); + rc = SA_AIS_ERR_BAD_HANDLE; + break; + } + + if (queue_node->client_info->version.majorVersion < + MQA_MAJOR_VERSION) { + TRACE_2("ERR_VERSION: client not B.03.01"); + rc = SA_AIS_ERR_VERSION; + break; + } + + if (queue_node->client_info->version.majorVersion == + MQA_MAJOR_VERSION) { + if (!mqa_cb->clm_node_joined || + queue_node->client_info->isStale) { + TRACE_2("ERR_UNAVAILABLE: node is not cluster " + "member"); + rc = SA_AIS_ERR_UNAVAILABLE; + break; + } + } + + if (!thresholds) { + TRACE_2("ERR_INVALID_PARAM: thresholds is 0"); + rc = SA_AIS_ERR_INVALID_PARAM; + break; + } + + for (i = SA_MSG_MESSAGE_HIGHEST_PRIORITY; + i <= SA_MSG_MESSAGE_LOWEST_PRIORITY; + i++) { + if (thresholds->capacityAvailable[i] > + thresholds->capacityReached[i]) { + TRACE_2("ERR_INVALID_PARAM: Available greater " + "than Reached"); + rc = SA_AIS_ERR_INVALID_PARAM; + break; + } + } + + if (rc != SA_AIS_OK) + break; + + /* Check if mqnd is up */ + if (!mqa_cb->is_mqnd_up) { + TRACE_2("ERR_TRY_AGAIN: MQND is down"); + rc = SA_AIS_ERR_TRY_AGAIN; + break; + } + + /* populate the structure */ + memset(&cap_evt, 0, sizeof(MQSV_EVT)); + cap_evt.type = MQSV_EVT_MQP_REQ; + cap_evt.msg.mqp_req.type = MQP_EVT_CAP_SET_REQ; + cap_evt.msg.mqp_req.info.capacity.queueHandle = queueHandle;; + cap_evt.msg.mqp_req.info.capacity.thresholds = *thresholds; + cap_evt.msg.mqp_req.agent_mds_dest = mqa_cb->mqa_mds_dest; + + /* send the request to the MQND */ + mds_rc = mqa_mds_msg_sync_send(mqa_cb->mqa_mds_hdl, + &mqa_cb->mqnd_mds_dest, + &cap_evt, + &out_evt, + MQSV_WAIT_TIME); + + switch (mds_rc) { + case NCSCC_RC_SUCCESS: + break; + + case NCSCC_RC_REQ_TIMOUT: + TRACE_2("ERR_TIMEOUT: Message Send through MDS " + "Timeout %" PRIx64, + mqa_cb->mqa_mds_dest); + rc = SA_AIS_ERR_TIMEOUT; + break; + + case NCSCC_RC_FAILURE: + TRACE_2("ERR_TRY_AGAIN: Message Send through " + "MDS Failure %" PRIx64, + mqa_cb->mqa_mds_dest); + rc = SA_AIS_ERR_TRY_AGAIN; + break; + + default: + TRACE_4("ERR_RESOURCES: Message Send through " + "MDS Failure %" PRIx64, + mqa_cb->mqa_mds_dest); + rc = SA_AIS_ERR_NO_RESOURCES; + break; + } + + if (mds_rc != NCSCC_RC_SUCCESS) + break; + + if (out_evt) { + rc = out_evt->msg.mqp_rsp.error; + m_MMGR_FREE_MQA_EVT(out_evt); + } else { + TRACE_4("ERR_RESOURCES: Response not received from " + "MQND"); + rc = SA_AIS_ERR_NO_RESOURCES; + } + } while (false); + + if (locked) + m_NCS_UNLOCK(&mqa_cb->cb_lock, NCS_LOCK_WRITE); + + if (mqa_cb) + m_MQSV_MQA_GIVEUP_MQA_CB; + + if (rc == SA_AIS_OK) { + TRACE_LEAVE2(" Success "); + } else { + TRACE_LEAVE2(" Failed with return code - %d ", rc); + } + + return rc; +} + +/**************************************************************************** + Name : saMsgQueueCapacityThresholdsGet + + Description : This routine gets the critical capacity thresholds for the + queue. + + Arguments : SaMsgQueueHandleT queueHandle + : SaMsgQueueThresholdsT *thresholds + + Return Values : SaAisErrorT + + Notes : None +******************************************************************************/ +SaAisErrorT saMsgQueueCapacityThresholdsGet(SaMsgQueueHandleT queueHandle, + SaMsgQueueThresholdsT *thresholds) +{ + SaAisErrorT rc = SA_AIS_OK; + MQA_CB *mqa_cb; + bool locked = false; + + do { + MQSV_EVT cap_evt; + MQSV_EVT *out_evt = 0; + MQA_QUEUE_INFO *queue_node; + uint8_t mds_rc; + + TRACE_ENTER2("SaMsgQueueHandle %llu ", queueHandle); + + /* retrieve MQA CB */ + mqa_cb = (MQA_CB *)m_MQSV_MQA_RETRIEVE_MQA_CB; + if (!mqa_cb) { + TRACE_2("ERR_BAD_HANDLE: Control block retrieval " + "failed"); + rc = SA_AIS_ERR_BAD_HANDLE; + break; + } + + if (m_NCS_LOCK(&mqa_cb->cb_lock, NCS_LOCK_WRITE) != + NCSCC_RC_SUCCESS) { + TRACE_4("ERR_LIBRARY: Lock failed for control block " + "write"); + rc = SA_AIS_ERR_LIBRARY; + break; + } + + locked = true; + + /* Check if queueHandle is present in the tree */ + if ((queue_node = mqa_queue_tree_find_and_add( + mqa_cb, queueHandle, false, NULL, 0)) == NULL) { + TRACE_2("ERR_BAD_HANDLE: Queue Database Find Failed"); + rc = SA_AIS_ERR_BAD_HANDLE; + break; + } + + if (queue_node->client_info->version.majorVersion < + MQA_MAJOR_VERSION) { + TRACE_2("ERR_VERSION: client not B.03.01"); + rc = SA_AIS_ERR_VERSION; + break; + } + + if (queue_node->client_info->version.majorVersion == + MQA_MAJOR_VERSION) { + if (!mqa_cb->clm_node_joined || + queue_node->client_info->isStale) { + TRACE_2("ERR_UNAVAILABLE: node is not cluster " + "member"); + rc = SA_AIS_ERR_UNAVAILABLE; + break; + } + } + + if (!thresholds) { + TRACE_2("ERR_INVALID_PARAM: thresholds is 0"); + rc = SA_AIS_ERR_INVALID_PARAM; + break; + } + + /* Check if mqnd is up */ + if (!mqa_cb->is_mqnd_up) { + TRACE_2("ERR_TRY_AGAIN: MQD or MQND is down"); + rc = SA_AIS_ERR_TRY_AGAIN; + break; + } + + /* populate the structure */ + memset(&cap_evt, 0, sizeof(MQSV_EVT)); + cap_evt.type = MQSV_EVT_MQP_REQ; + cap_evt.msg.mqp_req.type = MQP_EVT_CAP_GET_REQ; + cap_evt.msg.mqp_req.info.capacity.queueHandle = queueHandle;; + cap_evt.msg.mqp_req.agent_mds_dest = mqa_cb->mqa_mds_dest; + + /* send the request to the MQND */ + mds_rc = mqa_mds_msg_sync_send(mqa_cb->mqa_mds_hdl, + &mqa_cb->mqnd_mds_dest, + &cap_evt, + &out_evt, + MQSV_WAIT_TIME); + + switch (mds_rc) { + case NCSCC_RC_SUCCESS: + break; + + case NCSCC_RC_REQ_TIMOUT: + TRACE_2("ERR_TIMEOUT: Message Send through MDS " + "Timeout %" PRIx64, + mqa_cb->mqa_mds_dest); + rc = SA_AIS_ERR_TIMEOUT; + break; + + case NCSCC_RC_FAILURE: + TRACE_2("ERR_TRY_AGAIN: Message Send through " + "MDS Failure %" PRIx64, + mqa_cb->mqa_mds_dest); + rc = SA_AIS_ERR_TRY_AGAIN; + break; + + default: + TRACE_4("ERR_RESOURCES: Message Send through " + "MDS Failure %" PRIx64, + mqa_cb->mqa_mds_dest); + rc = SA_AIS_ERR_NO_RESOURCES; + break; + } + + if (mds_rc != NCSCC_RC_SUCCESS) + break; + + if (out_evt) { + rc = out_evt->msg.mqp_rsp.error; + + if (rc == SA_AIS_OK) { + *thresholds = out_evt->msg.mqp_rsp.info. + capacity.thresholds; + } + + m_MMGR_FREE_MQA_EVT(out_evt); + } else { + TRACE_4("ERR_RESOURCES: Response not received from " + "MQND"); + rc = SA_AIS_ERR_NO_RESOURCES; + } + } while (false); + + if (locked) + m_NCS_UNLOCK(&mqa_cb->cb_lock, NCS_LOCK_WRITE); + + if (mqa_cb) + m_MQSV_MQA_GIVEUP_MQA_CB; + + if (rc == SA_AIS_OK) { + TRACE_LEAVE2(" Success "); + } else { + TRACE_LEAVE2(" Failed with return code - %d ", rc); + } + + return rc; +} /***************************************************************************** Name : msgget_timer_expired diff --git a/src/msg/apitest/test_CapacityThresholds.cc b/src/msg/apitest/test_CapacityThresholds.cc new file mode 100644 index 0000000..38d2144 --- /dev/null +++ b/src/msg/apitest/test_CapacityThresholds.cc @@ -0,0 +1,1049 @@ +#include <cassert> +#include <condition_variable> +#include <cstdio> +#include <cstring> +#include <iostream> +#include <mutex> +#include <queue> +#include <thread> +#include <sys/poll.h> +#include "msg/apitest/msgtest.h" +#include "msg/apitest/tet_mqsv.h" +#include <saMsg.h> +#include <saNtf.h> + +static SaVersionT msg3_1 = {'B', 3, 0}; + +typedef std::queue<SaMsgMessageCapacityStatusT> TestQueue; +typedef std::queue<int> ResultQueue; +static TestQueue testQueue; +static ResultQueue resultQueue; +static std::mutex m; +static std::condition_variable cv; +static bool ready(false); +static bool pollDone(false); + +static const SaNameT queueName = { + sizeof("safMq=TestQueue") - 1, + "safMq=TestQueue" +}; + +static const SaNameT queueGroupName = { + sizeof("safMqg=TestQueueGroup") - 1, + "safMqg=TestQueueGroup" +}; + +static const SaMsgQueueCreationAttributesT creationAttributes = { + 0, + { 5, 5, 5, 5 }, + SA_TIME_ONE_SECOND +}; + +static SaMsgQueueHandleT openQueue(SaMsgHandleT msgHandle) { + SaMsgQueueHandleT queueHandle; + + SaAisErrorT rc(saMsgQueueOpen(msgHandle, + &queueName, + &creationAttributes, + SA_MSG_QUEUE_CREATE, + SA_TIME_MAX, + &queueHandle)); + assert(rc == SA_AIS_OK); + + return queueHandle; +} + + +static void ntfCallback(SaNtfSubscriptionIdT subscriptionId, + const SaNtfNotificationsT *n) { + int asyncTestResult(TET_FAIL); + int asyncTest(testQueue.front()); + testQueue.pop(); + + do { + if (n->notificationType != SA_NTF_TYPE_STATE_CHANGE) { + break; + } + + if (n->notification.stateChangeNotification.notificationHeader.notificationClassId->vendorId != SA_NTF_VENDOR_ID_SAF) { + break; + } + + if (n->notification.stateChangeNotification.notificationHeader.notificationClassId->majorId != SA_SVC_MSG) { + break; + } + + if (n->notification.stateChangeNotification.numStateChanges != 1) { + break; + } + + if (*n->notification.stateChangeNotification.sourceIndicator != SA_NTF_OBJECT_OPERATION) { + break; + } + + if (n->notification.stateChangeNotification.changedStates[0].stateId != SA_MSG_DEST_CAPACITY_STATUS) { + break; + } + + if (n->notification.stateChangeNotification.notificationHeader.notifyingObject->length != sizeof("safApp=safMsgService") - 1) { + break; + } + + if (memcmp(n->notification.stateChangeNotification.notificationHeader.notifyingObject->value, + "safApp=safMsgService", + n->notification.stateChangeNotification.notificationHeader.notifyingObject->length)) { + break; + } + + if (asyncTest == SA_MSG_QUEUE_CAPACITY_REACHED) { + if (n->notification.stateChangeNotification.notificationHeader.notificationClassId->minorId != 0x65) { + break; + } + + if (n->notification.stateChangeNotification.changedStates[0].newState != SA_MSG_QUEUE_CAPACITY_REACHED) { + break; + } + + if (n->notification.stateChangeNotification.changedStates[0].oldStatePresent != SA_FALSE) { + break; + } + + asyncTestResult = TET_PASS; + } else if (asyncTest == SA_MSG_QUEUE_CAPACITY_AVAILABLE) { + if (n->notification.stateChangeNotification.notificationHeader.notificationClassId->minorId != 0x66) { + break; + } + + if (n->notification.stateChangeNotification.changedStates[0].newState != SA_MSG_QUEUE_CAPACITY_AVAILABLE) { + break; + } + + if (n->notification.stateChangeNotification.changedStates[0].oldStatePresent != SA_TRUE) { + break; + } + + if (n->notification.stateChangeNotification.changedStates[0].oldState != SA_MSG_QUEUE_CAPACITY_REACHED) { + break; + } + + asyncTestResult = TET_PASS; + } else if (asyncTest == SA_MSG_QUEUE_GROUP_CAPACITY_REACHED) { + if (n->notification.stateChangeNotification.notificationHeader.notificationClassId->minorId != 0x67) { + break; + } + + if (n->notification.stateChangeNotification.changedStates[0].newState != SA_MSG_QUEUE_GROUP_CAPACITY_REACHED) { + break; + } + + if (n->notification.stateChangeNotification.changedStates[0].oldStatePresent != SA_FALSE) { + break; + } + + asyncTestResult = TET_PASS; + } else if (asyncTest == SA_MSG_QUEUE_GROUP_CAPACITY_AVAILABLE) { + if (n->notification.stateChangeNotification.notificationHeader.notificationClassId->minorId != 0x68) { + break; + } + + if (n->notification.stateChangeNotification.changedStates[0].newState != SA_MSG_QUEUE_GROUP_CAPACITY_AVAILABLE) { + break; + } + + if (n->notification.stateChangeNotification.changedStates[0].oldStatePresent != SA_TRUE) { + break; + } + + if (n->notification.stateChangeNotification.changedStates[0].oldState != SA_MSG_QUEUE_GROUP_CAPACITY_REACHED) { + break; + } + + asyncTestResult = TET_PASS; + } else { + assert(false); + break; + } + } while (false); + + resultQueue.push(asyncTestResult); + + if (testQueue.empty()) { + { + std::lock_guard<std::mutex> lk(m); + ready = true; + } + + cv.notify_one(); + } +} + +static void testNtfStateChange(SaNtfHandleT ntfHandle, + SaSelectionObjectT ntfSelObj) { + pollfd fd = { static_cast<int>(ntfSelObj), POLLIN }; + + do { + if (pollDone) + break; + + if (poll(&fd, 1, 1000) < 0) { + m_TET_MQSV_PRINTF("poll FAILED: %i\n", errno); + break; + } + + if (fd.revents & POLLIN) { + SaAisErrorT rc(saNtfDispatch(ntfHandle, SA_DISPATCH_ONE)); + + if (rc != SA_AIS_OK) { + if (rc != SA_AIS_ERR_BAD_HANDLE) + std::cerr << "saNtfDispatch failed: " << rc << std::endl; + break; + } + } + } while (true); +} + +static int testNtfInit(SaNtfHandleT& ntfHandle, SaSelectionObjectT& ntfSelObj) { + int result(TET_PASS); + + do { + SaNtfCallbacksT callbacks = { ntfCallback, 0 }; + SaVersionT version = { 'A', 1, 0 }; + SaNtfStateChangeNotificationFilterT stateChangeFilter; + + SaAisErrorT rc(saNtfInitialize(&ntfHandle, &callbacks, &version)); + if (rc != SA_AIS_OK) { + result = TET_FAIL; + break; + } + + rc = saNtfStateChangeNotificationFilterAllocate( + ntfHandle, &stateChangeFilter, 0, 0, 0, 0, 0, 0); + + if (rc != SA_AIS_OK) { + result = TET_FAIL; + break; + } + + SaNtfNotificationTypeFilterHandlesT notificationFilterHandles = { + 0, 0, stateChangeFilter.notificationFilterHandle, 0, 0 + }; + + rc = saNtfNotificationSubscribe(¬ificationFilterHandles, 1); + + if (rc != SA_AIS_OK) { + result = TET_FAIL; + break; + } + + rc = saNtfNotificationFilterFree(stateChangeFilter.notificationFilterHandle); + if (rc != SA_AIS_OK) { + result = TET_FAIL; + break; + } + + rc = saNtfSelectionObjectGet(ntfHandle, &ntfSelObj); + if (rc != SA_AIS_OK) { + result = TET_FAIL; + break; + } + + pollDone = false; + } while (false); + + return result; +} + +static int testNtfCleanup(SaNtfHandleT ntfHandle) { + int result(TET_PASS); + SaAisErrorT rc(saNtfFinalize(ntfHandle)); + if (rc != SA_AIS_OK) + result = TET_FAIL; + + pollDone = true; + + return result; +} + +static void capacityThresholds_01(void) { + SaMsgQueueThresholdsT thresholds; + SaAisErrorT rc(saMsgQueueCapacityThresholdsSet(0xdeadbeef, &thresholds)); + aisrc_validate(rc, SA_AIS_ERR_BAD_HANDLE); +} + +static void capacityThresholds_02(void) { + SaMsgHandleT msgHandle; + SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1); + assert(rc == SA_AIS_OK); + + SaMsgQueueHandleT queueHandle(openQueue(msgHandle)); + + rc = saMsgFinalize(msgHandle); + assert(rc == SA_AIS_OK); + + SaMsgQueueThresholdsT thresholds; + rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds); + aisrc_validate(rc, SA_AIS_ERR_BAD_HANDLE); +} + +static void capacityThresholds_03(void) { + SaMsgHandleT msgHandle; + SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1); + assert(rc == SA_AIS_OK); + + SaMsgQueueHandleT queueHandle(openQueue(msgHandle)); + + rc = saMsgQueueClose(queueHandle); + assert(rc == SA_AIS_OK); + + SaMsgQueueThresholdsT thresholds; + rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds); + aisrc_validate(rc, SA_AIS_ERR_BAD_HANDLE); + + rc = saMsgFinalize(msgHandle); + assert(rc == SA_AIS_OK); +} + +static void capacityThresholds_04(void) { + SaMsgHandleT msgHandle; + SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1); + assert(rc == SA_AIS_OK); + + SaMsgQueueHandleT queueHandle(openQueue(msgHandle)); + + rc = saMsgQueueCapacityThresholdsSet(queueHandle, 0); + aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM); + + rc = saMsgFinalize(msgHandle); + assert(rc == SA_AIS_OK); +} + +static void capacityThresholds_05(void) { + SaMsgHandleT msgHandle; + SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1); + assert(rc == SA_AIS_OK); + + SaMsgQueueHandleT queueHandle(openQueue(msgHandle)); + + const SaMsgQueueThresholdsT thresholds = { + { 4, 5, 5, 5 }, { 5, 5, 5, 5 } + }; + + rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds); + aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM); + + rc = saMsgFinalize(msgHandle); + assert(rc == SA_AIS_OK); +} + +static void capacityThresholds_06(void) { + SaMsgHandleT msgHandle; + SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1); + assert(rc == SA_AIS_OK); + + SaMsgQueueHandleT queueHandle(openQueue(msgHandle)); + + const SaMsgQueueThresholdsT thresholds = { + { 5, 4, 5, 5 }, { 5, 5, 5, 5 } + }; + + rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds); + aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM); + + rc = saMsgFinalize(msgHandle); + assert(rc == SA_AIS_OK); +} + +static void capacityThresholds_07(void) { + SaMsgHandleT msgHandle; + SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1); + assert(rc == SA_AIS_OK); + + SaMsgQueueHandleT queueHandle(openQueue(msgHandle)); + + const SaMsgQueueThresholdsT thresholds = { + { 5, 5, 4, 5 }, { 5, 5, 5, 5 } + }; + + rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds); + aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM); + + rc = saMsgFinalize(msgHandle); + assert(rc == SA_AIS_OK); +} + +static void capacityThresholds_08(void) { + SaMsgHandleT msgHandle; + SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1); + assert(rc == SA_AIS_OK); + + SaMsgQueueHandleT queueHandle(openQueue(msgHandle)); + + const SaMsgQueueThresholdsT thresholds = { + { 5, 5, 5, 4 }, { 5, 5, 5, 5 } + }; + + rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds); + aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM); + + rc = saMsgFinalize(msgHandle); + assert(rc == SA_AIS_OK); +} + +static void capacityThresholds_09(void) { + SaMsgHandleT msgHandle; + SaVersionT msg1_1 = {'B', 1, 0}; + SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg1_1); + assert(rc == SA_AIS_OK); + + SaMsgQueueHandleT queueHandle(openQueue(msgHandle)); + + const SaMsgQueueThresholdsT thresholds = { + { 5, 5, 5, 5 }, { 5, 5, 5, 5 } + }; + + rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds); + aisrc_validate(rc, SA_AIS_ERR_VERSION); + + rc = saMsgFinalize(msgHandle); + assert(rc == SA_AIS_OK); +} + +static void capacityThresholds_10(void) { + SaMsgHandleT msgHandle; + SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1); + assert(rc == SA_AIS_OK); + + SaMsgQueueHandleT queueHandle(openQueue(msgHandle)); + + const SaMsgQueueThresholdsT thresholds = { + { 6, 5, 5, 5 }, { 5, 5, 5, 5 } + }; + + rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds); + aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM); + + rc = saMsgFinalize(msgHandle); + assert(rc == SA_AIS_OK); +} + +static void capacityThresholds_11(void) { + SaMsgHandleT msgHandle; + SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1); + assert(rc == SA_AIS_OK); + + SaMsgQueueHandleT queueHandle(openQueue(msgHandle)); + + const SaMsgQueueThresholdsT thresholds = { + { 5, 6, 5, 5 }, { 5, 5, 5, 5 } + }; + + rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds); + aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM); + + rc = saMsgFinalize(msgHandle); + assert(rc == SA_AIS_OK); +} + +static void capacityThresholds_12(void) { + SaMsgHandleT msgHandle; + SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1); + assert(rc == SA_AIS_OK); + + SaMsgQueueHandleT queueHandle(openQueue(msgHandle)); + + const SaMsgQueueThresholdsT thresholds = { + { 5, 5, 6, 5 }, { 5, 5, 5, 5 } + }; + + rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds); + aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM); + + rc = saMsgFinalize(msgHandle); + assert(rc == SA_AIS_OK); +} + +static void capacityThresholds_13(void) { + SaMsgHandleT msgHandle; + SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1); + assert(rc == SA_AIS_OK); + + SaMsgQueueHandleT queueHandle(openQueue(msgHandle)); + + const SaMsgQueueThresholdsT thresholds = { + { 5, 5, 5, 6 }, { 5, 5, 5, 5 } + }; + + rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds); + aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM); + + rc = saMsgFinalize(msgHandle); + assert(rc == SA_AIS_OK); +} + +static void capacityThresholds_14(void) { + SaMsgHandleT msgHandle; + SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1); + assert(rc == SA_AIS_OK); + + SaMsgQueueHandleT queueHandle(openQueue(msgHandle)); + + const SaMsgQueueThresholdsT thresholds = { + { 5, 5, 5, 5 }, { 5, 5, 5, 5 } + }; + + rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds); + aisrc_validate(rc, SA_AIS_OK); + + rc = saMsgFinalize(msgHandle); + assert(rc == SA_AIS_OK); +} + +static void capacityThresholds_15(void) { + SaMsgQueueThresholdsT thresholds; + SaAisErrorT rc(saMsgQueueCapacityThresholdsGet(0xdeadbeef, &thresholds)); + aisrc_validate(rc, SA_AIS_ERR_BAD_HANDLE); +} + +static void capacityThresholds_16(void) { + SaMsgHandleT msgHandle; + SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1); + assert(rc == SA_AIS_OK); + + SaMsgQueueHandleT queueHandle(openQueue(msgHandle)); + + rc = saMsgFinalize(msgHandle); + assert(rc == SA_AIS_OK); + + SaMsgQueueThresholdsT thresholds; + rc = saMsgQueueCapacityThresholdsGet(queueHandle, &thresholds); + aisrc_validate(rc, SA_AIS_ERR_BAD_HANDLE); +} + +static void capacityThresholds_17(void) { + SaMsgHandleT msgHandle; + SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1); + assert(rc == SA_AIS_OK); + + SaMsgQueueHandleT queueHandle(openQueue(msgHandle)); + + rc = saMsgQueueClose(queueHandle); + assert(rc == SA_AIS_OK); + + SaMsgQueueThresholdsT thresholds; + rc = saMsgQueueCapacityThresholdsGet(queueHandle, &thresholds); + aisrc_validate(rc, SA_AIS_ERR_BAD_HANDLE); + + rc = saMsgFinalize(msgHandle); + assert(rc == SA_AIS_OK); +} + +static void capacityThresholds_18(void) { + SaMsgHandleT msgHandle; + SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1); + assert(rc == SA_AIS_OK); + + SaMsgQueueHandleT queueHandle(openQueue(msgHandle)); + + rc = saMsgQueueCapacityThresholdsGet(queueHandle, 0); + aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM); + + rc = saMsgFinalize(msgHandle); + assert(rc == SA_AIS_OK); +} + +static void capacityThresholds_19(void) { + SaMsgHandleT msgHandle; + SaVersionT msg1_1 = {'B', 1, 0}; + SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg1_1); + assert(rc == SA_AIS_OK); + + SaMsgQueueHandleT queueHandle(openQueue(msgHandle)); + + SaMsgQueueThresholdsT thresholds; + rc = saMsgQueueCapacityThresholdsGet(queueHandle, &thresholds); + aisrc_validate(rc, SA_AIS_ERR_VERSION); + + rc = saMsgFinalize(msgHandle); + assert(rc == SA_AIS_OK); +} + +static void capacityThresholds_20(void) { + SaMsgHandleT msgHandle; + SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1); + assert(rc == SA_AIS_OK); + + SaMsgQueueHandleT queueHandle(openQueue(msgHandle)); + + const SaMsgQueueThresholdsT thresholds = { + { 1, 2, 3, 4 }, { 1, 2, 3, 4 } + }; + + rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds); + assert(rc == SA_AIS_OK); + + SaMsgQueueThresholdsT thresholdsGet; + rc = saMsgQueueCapacityThresholdsGet(queueHandle, &thresholdsGet); + aisrc_validate(rc, SA_AIS_OK); + + assert(!memcmp(&thresholds, &thresholdsGet, sizeof(SaMsgQueueThresholdsT))); + + rc = saMsgFinalize(msgHandle); + assert(rc == SA_AIS_OK); +} + +static void capacityThresholds_21(void) { + int result(TET_PASS); + + do { + SaMsgHandleT msgHandle; + SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1); + assert(rc == SA_AIS_OK); + + SaMsgQueueHandleT queueHandle(openQueue(msgHandle)); + + const SaMsgQueueThresholdsT thresholds = { + { 3, 3, 3, 3 }, { 2, 2, 2, 2 } + }; + + rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds); + assert(rc == SA_AIS_OK); + + SaNtfHandleT ntfHandle; + SaSelectionObjectT ntfSelObj; + result = testNtfInit(ntfHandle, ntfSelObj); + if (result != TET_PASS) + break; + + ready = false; + std::thread t(&testNtfStateChange, ntfHandle, ntfSelObj); + testQueue.push(SA_MSG_QUEUE_CAPACITY_REACHED); + + /* fill up the q */ + for (SaUint8T i(SA_MSG_MESSAGE_HIGHEST_PRIORITY); + i <= SA_MSG_MESSAGE_LOWEST_PRIORITY; + i++) { + char data[] = "abcd"; + + SaMsgMessageT message = { + 1, 1, sizeof(data) - 1, 0, data, i + }; + + rc = saMsgMessageSend(msgHandle, &queueName, &message, SA_TIME_ONE_SECOND); + assert(rc == SA_AIS_OK); + } + + // wait for the notifications + { + std::unique_lock<std::mutex> lk(m); + cv.wait(lk, []{ return ready; }); + } + + result = resultQueue.front(); + resultQueue.pop(); + if (result != TET_PASS) + break; + + result = testNtfCleanup(ntfHandle); + if (result != TET_PASS) + break; + + rc = saMsgFinalize(msgHandle); + assert(rc == SA_AIS_OK); + + t.join(); + } while (false); + + test_validate(result, TET_PASS); + + // let the queue close + sleep(2); +} + +static void capacityThresholds_22(void) { + int result(TET_PASS); + + do { + SaMsgHandleT msgHandle; + SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1); + assert(rc == SA_AIS_OK); + + SaMsgQueueHandleT queueHandle(openQueue(msgHandle)); + + const SaMsgQueueThresholdsT thresholds = { + { 3, 3, 3, 3 }, { 2, 2, 2, 2 } + }; + + rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds); + assert(rc == SA_AIS_OK); + + SaNtfHandleT ntfHandle; + SaSelectionObjectT ntfSelObj; + result = testNtfInit(ntfHandle, ntfSelObj); + if (result != TET_PASS) + break; + + ready = false; + std::thread t(&testNtfStateChange, ntfHandle, ntfSelObj); + testQueue.push(SA_MSG_QUEUE_CAPACITY_REACHED); + + /* fill up the q */ + for (SaUint8T i(SA_MSG_MESSAGE_HIGHEST_PRIORITY); + i <= SA_MSG_MESSAGE_LOWEST_PRIORITY; + i++) { + char data[] = "abcd"; + + SaMsgMessageT message = { + 1, 1, sizeof(data) - 1, 0, data, i + }; + + rc = saMsgMessageSend(msgHandle, &queueName, &message, SA_TIME_ONE_SECOND); + assert(rc == SA_AIS_OK); + } + + // wait for the notifications + { + std::unique_lock<std::mutex> lk(m); + cv.wait(lk, []{ return ready; }); + } + + result = resultQueue.front(); + resultQueue.pop(); + if (result != TET_PASS) + break; + + ready = false; + + testQueue.push(SA_MSG_QUEUE_CAPACITY_AVAILABLE); + + /* read a message */ + char data[4]; + SaMsgMessageT msg; + SaMsgSenderIdT senderId; + memset(&msg, 0, sizeof(msg)); + msg.data = &data; + msg.size = sizeof(data); + + rc = saMsgMessageGet(queueHandle, &msg, 0, &senderId, SA_TIME_ONE_SECOND); + assert(rc == SA_AIS_OK); + + // wait for the notifications + { + std::unique_lock<std::mutex> lk(m); + cv.wait(lk, []{ return ready; }); + } + + result = resultQueue.front(); + resultQueue.pop(); + if (result != TET_PASS) + break; + + result = testNtfCleanup(ntfHandle); + if (result != TET_PASS) + break; + + rc = saMsgFinalize(msgHandle); + assert(rc == SA_AIS_OK); + + t.join(); + } while (false); + + test_validate(result, TET_PASS); + + // let the queue close + sleep(2); +} + +static void capacityThresholds_23(void) { + int result(TET_PASS); + + do { + SaMsgHandleT msgHandle; + SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1); + assert(rc == SA_AIS_OK); + + SaMsgQueueHandleT queueHandle(openQueue(msgHandle)); + + const SaMsgQueueThresholdsT thresholds = { + { 3, 3, 3, 3 }, { 2, 2, 2, 2 } + }; + + rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds); + assert(rc == SA_AIS_OK); + + rc = saMsgQueueGroupCreate(msgHandle, + &queueGroupName, + SA_MSG_QUEUE_GROUP_ROUND_ROBIN); + assert(rc == SA_AIS_OK); + + rc = saMsgQueueGroupInsert(msgHandle, &queueGroupName, &queueName); + assert(rc == SA_AIS_OK); + + SaNtfHandleT ntfHandle; + SaSelectionObjectT ntfSelObj; + result = testNtfInit(ntfHandle, ntfSelObj); + if (result != TET_PASS) + break; + + ready = false; + std::thread t(&testNtfStateChange, ntfHandle, ntfSelObj); + testQueue.push(SA_MSG_QUEUE_CAPACITY_REACHED); + testQueue.push(SA_MSG_QUEUE_GROUP_CAPACITY_REACHED); + + /* fill up the q */ + for (SaUint8T i(SA_MSG_MESSAGE_HIGHEST_PRIORITY); + i <= SA_MSG_MESSAGE_LOWEST_PRIORITY; + i++) { + char data[] = "abcd"; + + SaMsgMessageT message = { + 1, 1, sizeof(data) - 1, 0, data, i + }; + + rc = saMsgMessageSend(msgHandle, + &queueGroupName, + &message, + SA_TIME_ONE_SECOND); + assert(rc == SA_AIS_OK); + } + + // wait for the notifications + { + std::unique_lock<std::mutex> lk(m); + cv.wait(lk, []{ return ready; }); + } + + ready = false; + + result = resultQueue.front(); + resultQueue.pop(); + if (result != TET_PASS) + break; + + result = resultQueue.front(); + resultQueue.pop(); + if (result != TET_PASS) + break; + + result = testNtfCleanup(ntfHandle); + if (result != TET_PASS) { + break; + } + + rc = saMsgQueueGroupDelete(msgHandle, &queueGroupName); + assert(rc == SA_AIS_OK); + + rc = saMsgFinalize(msgHandle); + assert(rc == SA_AIS_OK); + + t.join(); + } while (false); + + test_validate(result, TET_PASS); + + // let the queue close + sleep(2); +} + +static void capacityThresholds_24(void) { + int result(TET_PASS); + + do { + SaMsgHandleT msgHandle; + SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1); + assert(rc == SA_AIS_OK); + + SaMsgQueueHandleT queueHandle(openQueue(msgHandle)); + + const SaMsgQueueThresholdsT thresholds = { + { 3, 3, 3, 3 }, { 2, 2, 2, 2 } + }; + + rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds); + assert(rc == SA_AIS_OK); + + rc = saMsgQueueGroupCreate(msgHandle, + &queueGroupName, + SA_MSG_QUEUE_GROUP_ROUND_ROBIN); + assert(rc == SA_AIS_OK); + + rc = saMsgQueueGroupInsert(msgHandle, &queueGroupName, &queueName); + assert(rc == SA_AIS_OK); + + SaNtfHandleT ntfHandle; + SaSelectionObjectT ntfSelObj; + result = testNtfInit(ntfHandle, ntfSelObj); + if (result != TET_PASS) { + break; + } + + ready = false; + std::thread t(&testNtfStateChange, ntfHandle, ntfSelObj); + testQueue.push(SA_MSG_QUEUE_CAPACITY_REACHED); + testQueue.push(SA_MSG_QUEUE_GROUP_CAPACITY_REACHED); + + /* fill up the q */ + for (SaUint8T i(SA_MSG_MESSAGE_HIGHEST_PRIORITY); + i <= SA_MSG_MESSAGE_LOWEST_PRIORITY; + i++) { + char data[] = "abcd"; + + SaMsgMessageT message = { + 1, 1, sizeof(data) - 1, 0, data, i + }; + + rc = saMsgMessageSend(msgHandle, &queueName, &message, SA_TIME_ONE_SECOND); + assert(rc == SA_AIS_OK); + } + + // wait for the notifications + { + std::unique_lock<std::mutex> lk(m); + cv.wait(lk, []{ return ready; }); + } + + result = resultQueue.front(); + resultQueue.pop(); + if (result != TET_PASS) + break; + + result = resultQueue.front(); + resultQueue.pop(); + if (result != TET_PASS) + break; + + ready = false; + + testQueue.push(SA_MSG_QUEUE_CAPACITY_AVAILABLE); + testQueue.push(SA_MSG_QUEUE_GROUP_CAPACITY_AVAILABLE); + + /* read a message */ + char data[4]; + SaMsgMessageT msg; + SaMsgSenderIdT senderId; + memset(&msg, 0, sizeof(msg)); + msg.data = &data; + msg.size = sizeof(data); + + rc = saMsgMessageGet(queueHandle, &msg, 0, &senderId, SA_TIME_ONE_SECOND); + assert(rc == SA_AIS_OK); + + { + std::unique_lock<std::mutex> lk(m); + cv.wait(lk, []{ return ready; }); + } + + result = resultQueue.front(); + resultQueue.pop(); + if (result != TET_PASS) + break; + + result = resultQueue.front(); + resultQueue.pop(); + if (result != TET_PASS) + break; + + result = testNtfCleanup(ntfHandle); + if (result != TET_PASS) { + break; + } + + rc = saMsgQueueGroupDelete(msgHandle, &queueGroupName); + assert(rc == SA_AIS_OK); + + rc = saMsgFinalize(msgHandle); + assert(rc == SA_AIS_OK); + + t.join(); + } while (false); + + test_validate(result, TET_PASS); + + // let the queue close + sleep(2); +} + +__attribute__((constructor)) static void capacityThresholds_constructor(void) { + test_suite_add(27, "Capacity Thresholds Test Suite"); + test_case_add(27, + capacityThresholds_01, + "saMsgQueueCapacityThresholdsSet returns BAD_HANDLE when " + "queueHandle is bad"); + test_case_add(27, + capacityThresholds_02, + "saMsgQueueCapacityThresholdsSet with finalized handle"); + test_case_add(27, + capacityThresholds_03, + "saMsgQueueCapacityThresholdsSet with closed queue"); + test_case_add(27, + capacityThresholds_04, + "saMsgQueueCapacityThresholdsSet with null thresholds pointer"); + test_case_add(27, + capacityThresholds_05, + "saMsgQueueCapacityThresholdsSet with capacityAvailable > " + "capacityReached (1)"); + test_case_add(27, + capacityThresholds_06, + "saMsgQueueCapacityThresholdsSet with capacityAvailable > " + "capacityReached (2)"); + test_case_add(27, + capacityThresholds_07, + "saMsgQueueCapacityThresholdsSet with capacityAvailable > " + "capacityReached (3)"); + test_case_add(27, + capacityThresholds_08, + "saMsgQueueCapacityThresholdsSet with capacityAvailable > " + "capacityReached (4)"); + test_case_add(27, + capacityThresholds_09, + "saMsgQueueCapacityThresholdsSet with version != B.03.01"); + test_case_add(27, + capacityThresholds_10, + "saMsgQueueCapacityThresholdsSet with size < capacityReached " + "(1)"); + test_case_add(27, + capacityThresholds_11, + "saMsgQueueCapacityThresholdsSet with size < capacityReached " + "(2)"); + test_case_add(27, + capacityThresholds_12, + "saMsgQueueCapacityThresholdsSet with size < capacityReached " + "(3)"); + test_case_add(27, + capacityThresholds_13, + "saMsgQueueCapacityThresholdsSet with size < capacityReached " + "(4)"); + test_case_add(27, + capacityThresholds_14, + "saMsgQueueCapacityThresholdsSet success"); + test_case_add(27, + capacityThresholds_15, + "saMsgQueueCapacityThresholdsGet returns BAD_HANDLE when " + "queueHandle is bad"); + test_case_add(27, + capacityThresholds_16, + "saMsgQueueCapacityThresholdsGet with finalized handle"); + test_case_add(27, + capacityThresholds_17, + "saMsgQueueCapacityThresholdsGet with closed queue"); + test_case_add(27, + capacityThresholds_18, + "saMsgQueueCapacityThresholdsGet with null thresholds pointer"); + test_case_add(27, + capacityThresholds_19, + "saMsgQueueCapacityThresholdsGet with version != B.03.01"); + test_case_add(27, + capacityThresholds_20, + "saMsgQueueCapacityThresholdsGet success"); + test_case_add(27, + capacityThresholds_21, + "Capacity Reached Notification sent"); + test_case_add(27, + capacityThresholds_22, + "Capacity Available Notification sent"); + test_case_add(27, + capacityThresholds_23, + "Capacity Reached Notification sent (group)"); + test_case_add(27, + capacityThresholds_24, + "Capacity Available Notification sent (group)"); +} diff --git a/src/msg/apitest/test_ErrUnavailable.cc b/src/msg/apitest/test_ErrUnavailable.cc index 3f37f8d..dc1b371 100644 --- a/src/msg/apitest/test_ErrUnavailable.cc +++ b/src/msg/apitest/test_ErrUnavailable.cc @@ -504,6 +504,59 @@ static void saErrUnavailable_25(void) assert(rc == SA_AIS_OK); } +static void saErrUnavailable_26(void) +{ + SaMsgHandleT msgHandle; + SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1); + assert(rc == SA_AIS_OK); + + SaMsgQueueHandleT queueHandle; + rc = saMsgQueueOpen(msgHandle, + &queueName, + &creationAttributes, + SA_MSG_QUEUE_CREATE, + SA_TIME_END, + &queueHandle); + assert(rc == SA_AIS_OK); + + lockUnlockNode(true); + const SaMsgQueueThresholdsT thresholds = { + { 5, 5, 5, 5 }, + { 5, 5, 5, 5 } + }; + rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds); + test_validate(rc, SA_AIS_ERR_UNAVAILABLE); + lockUnlockNode(false); + + rc = saMsgFinalize(msgHandle); + assert(rc == SA_AIS_OK); +} + +static void saErrUnavailable_27(void) +{ + SaMsgHandleT msgHandle; + SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1); + assert(rc == SA_AIS_OK); + + SaMsgQueueHandleT queueHandle; + rc = saMsgQueueOpen(msgHandle, + &queueName, + &creationAttributes, + SA_MSG_QUEUE_CREATE, + SA_TIME_END, + &queueHandle); + assert(rc == SA_AIS_OK); + + lockUnlockNode(true); + SaMsgQueueThresholdsT thresholds; + rc = saMsgQueueCapacityThresholdsGet(queueHandle, &thresholds); + test_validate(rc, SA_AIS_ERR_UNAVAILABLE); + lockUnlockNode(false); + + rc = saMsgFinalize(msgHandle); + assert(rc == SA_AIS_OK); +} + static void saErrUnavailable_30(void) { lockUnlockNode(true); @@ -940,6 +993,59 @@ static void saErrUnavailable_54(void) assert(rc == SA_AIS_OK); } +static void saErrUnavailable_55(void) +{ + SaMsgHandleT msgHandle; + SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1); + assert(rc == SA_AIS_OK); + + SaMsgQueueHandleT queueHandle; + rc = saMsgQueueOpen(msgHandle, + &queueName, + &creationAttributes, + SA_MSG_QUEUE_CREATE, + SA_TIME_END, + &queueHandle); + assert(rc == SA_AIS_OK); + + lockUnlockNode(true); + lockUnlockNode(false); + const SaMsgQueueThresholdsT thresholds = { + { 5, 5, 5, 5 }, + { 5, 5, 5, 5 } + }; + rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds); + test_validate(rc, SA_AIS_ERR_UNAVAILABLE); + + rc = saMsgFinalize(msgHandle); + assert(rc == SA_AIS_OK); +} + +static void saErrUnavailable_56(void) +{ + SaMsgHandleT msgHandle; + SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1); + assert(rc == SA_AIS_OK); + + SaMsgQueueHandleT queueHandle; + rc = saMsgQueueOpen(msgHandle, + &queueName, + &creationAttributes, + SA_MSG_QUEUE_CREATE, + SA_TIME_END, + &queueHandle); + assert(rc == SA_AIS_OK); + + lockUnlockNode(true); + lockUnlockNode(false); + SaMsgQueueThresholdsT thresholds; + rc = saMsgQueueCapacityThresholdsGet(queueHandle, &thresholds); + test_validate(rc, SA_AIS_ERR_UNAVAILABLE); + + rc = saMsgFinalize(msgHandle); + assert(rc == SA_AIS_OK); +} + __attribute__((constructor)) static void saErrUnavailable_constructor(void) { @@ -969,9 +1075,9 @@ __attribute__((constructor)) static void saErrUnavailable_constructor(void) test_case_add(26, saErrUnavailable_23, "saMsgMessageSendReceive"); test_case_add(26, saErrUnavailable_24, "saMsgMessageReply"); test_case_add(26, saErrUnavailable_25, "saMsgMessageReplyAsync"); -#if 0 test_case_add(26, saErrUnavailable_26, "saMsgQueueCapacityThresholdsSet"); test_case_add(26, saErrUnavailable_27, "saMsgQueueCapacityThresholdsGet"); +#if 0 test_case_add(26, saErrUnavailable_28, "saMsgMetadataSizeGet"); test_case_add(26, saErrUnavailable_29, "saMsgLimitGet"); #endif @@ -1000,9 +1106,9 @@ __attribute__((constructor)) static void saErrUnavailable_constructor(void) test_case_add(26, saErrUnavailable_52, "saMsgMessageSendReceive (stale)"); test_case_add(26, saErrUnavailable_53, "saMsgMessageReply (stale)"); test_case_add(26, saErrUnavailable_54, "saMsgMessageReplyAsync (stale)"); -#if 0 test_case_add(26, saErrUnavailable_55, "saMsgQueueCapacityThresholdsSet (stale)"); test_case_add(26, saErrUnavailable_56, "saMsgQueueCapacityThresholdsGet (stale)"); +#if 0 test_case_add(26, saErrUnavailable_57, "saMsgMetadataSizeGet (stale)"); test_case_add(26, saErrUnavailable_58, "saMsgLimitGet (stale)"); #endif diff --git a/src/msg/common/mqsv_edu.c b/src/msg/common/mqsv_edu.c index 7b0749b..a5301da 100644 --- a/src/msg/common/mqsv_edu.c +++ b/src/msg/common/mqsv_edu.c @@ -477,7 +477,8 @@ static int mqsv_mqp_req_test_type_fnc(NCSCONTEXT arg) LCL_TEST_JUMP_OFFSET_MQP_EVT_TRANSFER_QUEUE_COMPLETE = 60, LCL_TEST_JUMP_OFFSET_MQP_EVT_UPDATE_STATS = 62, LCL_TEST_JUMP_OFFSET_MQP_EVT_RET_TIME_SET_REQ = 65, - LCL_TEST_JUMP_OFFSET_MQP_EVT_CLM_NOTIFY = 67 }; + LCL_TEST_JUMP_OFFSET_MQP_EVT_CLM_NOTIFY = 67, + LCL_TEST_JUMP_OFFSET_MQP_EVT_CAPACITY = 68 }; MQP_REQ_TYPE type; if (arg == NULL) @@ -516,6 +517,9 @@ static int mqsv_mqp_req_test_type_fnc(NCSCONTEXT arg) return LCL_TEST_JUMP_OFFSET_MQP_EVT_RET_TIME_SET_REQ; case MQP_EVT_CLM_NOTIFY: return LCL_TEST_JUMP_OFFSET_MQP_EVT_CLM_NOTIFY; + case MQP_EVT_CAP_SET_REQ: + case MQP_EVT_CAP_GET_REQ: + return LCL_TEST_JUMP_OFFSET_MQP_EVT_CAPACITY; default: break; @@ -578,6 +582,55 @@ static uint32_t mqsv_edp_samsgqueuecreationattributest( /***************************************************************************** + PROCEDURE NAME: mqsv_edp_samsgqueuethresholdst + + DESCRIPTION: EDU program handler for "SaMsgQueueThresholdsT" data. This +function is invoked by EDU for performing encode/decode operation on +"SaMsgQueueThresholdsT" data. + + RETURNS: NCSCC_RC_SUCCESS/NCSCC_RC_FAILURE + +*****************************************************************************/ +static uint32_t mqsv_edp_samsgqueuethresholdst( + EDU_HDL *hdl, EDU_TKN *edu_tkn, NCSCONTEXT ptr, uint32_t *ptr_data_len, + EDU_BUF_ENV *buf_env, EDP_OP_TYPE op, EDU_ERR *o_err) +{ + uint32_t rc = NCSCC_RC_SUCCESS; + SaMsgQueueThresholdsT *struct_ptr = NULL, **d_ptr = NULL; + EDU_INST_SET mqsv_samsgqueuethresholds_rules[] = { + {EDU_START, mqsv_edp_samsgqueuethresholdst, 0, 0, 0, + sizeof(SaMsgQueueThresholdsT), 0, NULL}, + {EDU_EXEC, m_NCS_EDP_SASIZET, EDQ_ARRAY, 0, 0, + (long)&((SaMsgQueueThresholdsT *)0)->capacityReached, + SA_MSG_MESSAGE_LOWEST_PRIORITY + 1, NULL}, + {EDU_EXEC, m_NCS_EDP_SASIZET, EDQ_ARRAY, 0, 0, + (long)&((SaMsgQueueThresholdsT *)0)->capacityAvailable, + SA_MSG_MESSAGE_LOWEST_PRIORITY + 1, NULL}, + {EDU_END, 0, 0, 0, 0, 0, 0, NULL}, + }; + + if (op == EDP_OP_TYPE_ENC) { + struct_ptr = (SaMsgQueueThresholdsT *)ptr; + } else if (op == EDP_OP_TYPE_DEC) { + d_ptr = (SaMsgQueueThresholdsT **)ptr; + if (*d_ptr == NULL) { + /* This should have already been a valid pointer. */ + *o_err = EDU_ERR_MEM_FAIL; + return NCSCC_RC_FAILURE; + } + memset(*d_ptr, '\0', sizeof(SaMsgQueueThresholdsT)); + struct_ptr = *d_ptr; + } else { + struct_ptr = ptr; + } + rc = m_NCS_EDU_RUN_RULES(hdl, edu_tkn, + mqsv_samsgqueuethresholds_rules, + struct_ptr, ptr_data_len, buf_env, op, o_err); + return rc; +} + +/***************************************************************************** + PROCEDURE NAME: mqsv_edp_mqp_req DESCRIPTION: EDU program handler for "MQP_REQ_MSG" data. This @@ -802,6 +855,13 @@ static uint32_t mqsv_edp_mqp_req(EDU_HDL *hdl, EDU_TKN *edu_tkn, NCSCONTEXT ptr, {EDU_EXEC, ncs_edp_uns32, 0, 0, 0, (long)&((MQP_REQ_MSG *)0)->info.clmNotify.node_joined, 0, NULL}, + /* MQP_EVT_CAP_REQ */ + {EDU_EXEC, m_NCS_EDP_SAMSGQUEUEHANDLET, 0, 0, 0, + (long)&((MQP_REQ_MSG *)0)->info.capacity.queueHandle, 0, NULL}, + {EDU_EXEC, mqsv_edp_samsgqueuethresholdst, 0, 0, 0, + (long)&((MQP_REQ_MSG *)0)->info.capacity.thresholds, + 0, NULL}, + {EDU_END, 0, 0, 0, 0, 0, 0, NULL}, }; @@ -956,7 +1016,7 @@ static int mqsv_mqp_rsp_test_type_fnc(NCSCONTEXT arg) LCL_TEST_JUMP_OFFSET_MQP_EVT_TRANSFER_QUEUE_RSP, LCL_TEST_JUMP_OFFSET_MQP_EVT_ND_RESTART_RSP = 29, LCL_TEST_JUMP_OFFSET_MQP_EVT_RET_TIME_SET_RSP, - + LCL_TEST_JUMP_OFFSET_MQP_EVT_CAP_RSP = 32 }; MQP_RSP_TYPE type; @@ -988,6 +1048,9 @@ static int mqsv_mqp_rsp_test_type_fnc(NCSCONTEXT arg) return LCL_TEST_JUMP_OFFSET_MQP_EVT_ND_RESTART_RSP; case MQP_EVT_Q_RET_TIME_SET_RSP: return LCL_TEST_JUMP_OFFSET_MQP_EVT_RET_TIME_SET_RSP; + case MQP_EVT_CAP_SET_RSP: + case MQP_EVT_CAP_GET_RSP: + return LCL_TEST_JUMP_OFFSET_MQP_EVT_CAP_RSP; default: break; } @@ -1109,6 +1172,13 @@ static uint32_t mqsv_edp_mqp_rsp(EDU_HDL *hdl, EDU_TKN *edu_tkn, NCSCONTEXT ptr, (long)&((MQP_RSP_MSG *)0)->info.retTimeSetRsp.queueHandle, 0, NULL}, + /* MQP_EVT_CAP_RSP */ + {EDU_EXEC, m_NCS_EDP_SAMSGQUEUEHANDLET, 0, 0, 0, + (long)&((MQP_RSP_MSG *)0)->info.capacity.queueHandle, 0, NULL}, + {EDU_EXEC, mqsv_edp_samsgqueuethresholdst, 0, 0, 0, + (long)&((MQP_RSP_MSG *)0)->info.capacity.thresholds, + 0, NULL}, + {EDU_END, 0, 0, 0, 0, 0, 0, NULL}, }; diff --git a/src/msg/common/mqsv_evt.h b/src/msg/common/mqsv_evt.h index ef7c739..4e26bf6 100644 --- a/src/msg/common/mqsv_evt.h +++ b/src/msg/common/mqsv_evt.h @@ -62,7 +62,9 @@ typedef enum mqp_req_type { MQP_EVT_SEND_MSG_ASYNC, MQP_EVT_CB_DUMP, /* For CPND CB Dump */ MQP_EVT_Q_RET_TIME_SET_REQ, - MQP_EVT_CLM_NOTIFY + MQP_EVT_CLM_NOTIFY, + MQP_EVT_CAP_SET_REQ, + MQP_EVT_CAP_GET_REQ } MQP_REQ_TYPE; /* Enums for MQP Message Types */ @@ -81,7 +83,9 @@ typedef enum mqp_rsp_type { MQP_EVT_TRANSFER_QUEUE_RSP, MQP_EVT_MQND_RESTART_RSP, MQP_EVT_STAT_UPD_RSP, - MQP_EVT_Q_RET_TIME_SET_RSP + MQP_EVT_Q_RET_TIME_SET_RSP, + MQP_EVT_CAP_SET_RSP, + MQP_EVT_CAP_GET_RSP } MQP_RSP_TYPE; /* Enums for MQD messages */ @@ -188,6 +192,11 @@ typedef struct mqp_q_ret_time_set_rsp { SaMsgQueueHandleT queueHandle; } MQP_Q_RET_TIME_SET_RSP; +typedef struct mqp_capacity { + SaMsgQueueHandleT queueHandle; + SaMsgQueueThresholdsT thresholds; +} MQP_CAPACITY; + /* saMsgQueueOpen(): */ typedef struct mqp_open_req { @@ -396,6 +405,7 @@ typedef struct mqp_req_msg { MQP_UPDATE_STATS statsReq; MQP_Q_RET_TIME_SET_REQ retTimeSetReq; MQP_CLM_NOTIFY clmNotify; + MQP_CAPACITY capacity; } info; } MQP_REQ_MSG; @@ -415,6 +425,7 @@ typedef struct mqp_rsp_msg { MQP_QUEUE_REPLY_RSP replyRsp; MQP_SEND_MSG_RSP sendMsgRsp; MQP_Q_RET_TIME_SET_RSP retTimeSetRsp; + MQP_CAPACITY capacity; } info; } MQP_RSP_MSG; diff --git a/src/msg/msgd/mqd_api.c b/src/msg/msgd/mqd_api.c index 8416851..83d5c21 100644 --- a/src/msg/msgd/mqd_api.c +++ b/src/msg/msgd/mqd_api.c @@ -49,10 +49,11 @@ #include "msg/msgd/mqd.h" #include "mqd_imm.h" +#include "mqd_ntf.h" MQDLIB_INFO gl_mqdinfo; -enum { FD_TERM = 0, FD_AMF, FD_MBCSV, FD_MBX, FD_CLM, NUM_FD }; +enum { FD_TERM = 0, FD_AMF, FD_MBCSV, FD_MBX, FD_CLM, FD_NTF, NUM_FD }; static struct pollfd fds[NUM_FD]; static nfds_t nfds = NUM_FD; @@ -146,12 +147,50 @@ static SaAisErrorT mqd_clm_init(MQD_CB *cb) TRACE_1("saClmClusterTrack success"); } while (false); - if (saErr != SA_AIS_OK && !cb->clm_hdl) + if (saErr != SA_AIS_OK && cb->clm_hdl) saClmFinalize(cb->clm_hdl); return saErr; } +static SaAisErrorT mqd_ntf_init(MQD_CB *cb) +{ + SaAisErrorT rc = SA_AIS_OK; + TRACE_ENTER(); + + do { + SaVersionT ntfVersion = { 'A', 1, 1 }; + SaNtfCallbacksT callbacks = { + mqdNtfNotificationCallback, + 0 + }; + + rc = saNtfInitialize(&cb->ntfHandle, &callbacks, &ntfVersion); + if (rc != SA_AIS_OK) { + LOG_ER("saNtfInitialize failed with error %u", rc); + break; + } + + rc = saNtfSelectionObjectGet(cb->ntfHandle, &cb->ntf_sel_obj); + if (SA_AIS_OK != rc) { + LOG_ER("saNtfSelectionObjectGet failed with error %u", + rc); + break; + } + + rc = mqdInitNtfSubscriptions(cb->ntfHandle); + if (rc != SA_AIS_OK) + break; + } while (false); + + if (rc != SA_AIS_OK && cb->ntfHandle) + saNtfFinalize(cb->ntfHandle); + + TRACE_LEAVE(); + + return rc; +} + /****************************************************************************\ PROCEDURE NAME : mqd_lib_init @@ -306,6 +345,16 @@ static uint32_t mqd_lib_init(void) mqd_cb_shut(pMqd); return NCSCC_RC_FAILURE; } + + if (mqd_ntf_init(pMqd) != SA_AIS_OK) { + mqd_asapi_unbind(); + saAmfFinalize(pMqd->amf_hdl); + saClmFinalize(pMqd->clm_hdl); + ncshm_give_hdl(pMqd->hdl); + mqd_cb_shut(pMqd); + return NCSCC_RC_FAILURE; + } + if ((rc = initialize_for_assignment(pMqd, pMqd->ha_state)) != NCSCC_RC_SUCCESS) { LOG_ER("initialize_for_assignment FAILED %u", (unsigned)rc); @@ -493,6 +542,8 @@ void mqd_main_process(uint32_t hdl) fds[FD_CLM].events = POLLIN; fds[FD_MBCSV].fd = pMqd->mbcsv_sel_obj; fds[FD_MBCSV].events = POLLIN; + fds[FD_NTF].fd = pMqd->ntf_sel_obj; + fds[FD_NTF].events = POLLIN; int ret = poll(fds, nfds, -1); if (ret == -1) { @@ -522,6 +573,15 @@ void mqd_main_process(uint32_t hdl) LOG_ER("saClmDispatch failed: %u", err); } } + + if (fds[FD_NTF].revents & POLLIN) { + /* dispatch all the NTF pending function */ + err = saNtfDispatch(pMqd->ntfHandle, SA_DISPATCH_ALL); + if (err != SA_AIS_OK) { + LOG_ER("saNtfDispatch failed: %u", err); + } + } + /* dispatch all the MBCSV pending callbacks */ if (fds[FD_MBCSV].revents & POLLIN) { mbcsv_arg.i_op = NCS_MBCSV_OP_DISPATCH; diff --git a/src/msg/msgd/mqd_db.h b/src/msg/msgd/mqd_db.h index 022b03a..93dc18f 100644 --- a/src/msg/msgd/mqd_db.h +++ b/src/msg/msgd/mqd_db.h @@ -34,6 +34,7 @@ #include <stdbool.h> #include <saClm.h> #include <saImmOi.h> +#include <saNtf.h> #define MQSV_MQD_MBCSV_VERSION 1 #define MQSV_MQD_MBCSV_VERSION_MIN 1 @@ -91,6 +92,7 @@ typedef struct mqd_qinfo { NCS_QUEUE ilist; /* Queue/Group element list */ NCS_QUEUE tlist; /* List of the user who opted track */ SaTimeT creationTime; + bool capacityReached; } MQD_OBJ_INFO; typedef struct mqd_object_elem { @@ -146,6 +148,7 @@ typedef struct mqd_cb { SaClmHandleT clm_hdl; SaAmfHAStateT ha_state; /* Present AMF HA state of the component */ SaNameT comp_name; + SaNtfHandleT ntfHandle; NCS_MBCSV_HDL mbcsv_hdl; /*MBCSV handle for Redundancy of initialize */ NCS_MBCSV_CKPT_HDL o_ckpt_hdl; /*Opened Checkpoint Handle */ @@ -171,6 +174,7 @@ typedef struct mqd_cb { SaSelectionObjectT imm_sel_obj; /*Selection object to wait for IMM events */ SaSelectionObjectT clm_sel_obj; + SaSelectionObjectT ntf_sel_obj; bool fully_initialized; } MQD_CB; diff --git a/src/msg/msgd/mqd_ntf.cc b/src/msg/msgd/mqd_ntf.cc new file mode 100644 index 0000000..ab29193 --- /dev/null +++ b/src/msg/msgd/mqd_ntf.cc @@ -0,0 +1,310 @@ +/* -*- OpenSAF -*- + * + * (C) Copyright 2017 The OpenSAF Foundation + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed + * under the GNU Lesser General Public License Version 2.1, February 1999. + * The complete license can be accessed from the following location: + * https://urldefense.proofpoint.com/v2/url?u=http-3A__opensource.org_licenses_lgpl-2Dlicense.php&d=DwIF-g&c=RoP1YumCXCgaWHvlZYR8PZh8Bv7qIrMUB65eapI_JnE&r=rU6x356sikQZSi7Ttc2DuiqAgbc0QIeANg72N5AllVc&m=zb33_QGOVBVMiOqaTZ4TDnD1K3tft2hmzm4Zl_-Ak9E&s=zW1Da85bMtBUT-FnbCAff4Qx8AxM9-IibHRtx7PHWNs&e= + * See the Copying file included with the OpenSAF distribution for full + * licensing terms. + * + * Author(s): Genband + * + */ +#include <cstring> +#include <vector> +#include <saMsg.h> +#include "base/ncs_edu_pub.h" +#include "base/logtrace.h" +#include "base/ncs_queue.h" +#include "base/ncspatricia.h" +#include "base/ncssysf_tmr.h" +#include "mbc/mbcsv_papi.h" +#include "mds/mds_papi.h" +#include "msg/common/mqsv_def.h" +#include "msg/common/mqsv_asapi.h" +#include "mqd_tmr.h" +#include "mqd_db.h" +#include "mqd_dl_api.h" +#include "mqd_ntf.h" + +extern MQDLIB_INFO gl_mqdinfo; + +static const int operStateSubId = 1; + +static void sendStateChangeNotification(MQD_CB *cb, + const SaNameT& queueGroup, + SaUint16T status) { + TRACE_ENTER(); + + do { + if (status == SA_MSG_QUEUE_CAPACITY_REACHED) + status = SA_MSG_QUEUE_GROUP_CAPACITY_REACHED; + else if (status == SA_MSG_QUEUE_CAPACITY_AVAILABLE) + status = SA_MSG_QUEUE_GROUP_CAPACITY_AVAILABLE; + + const SaNameT notifyingObject = { + sizeof("safApp=safMsgService") - 1, + "safApp=safMsgService" + }; + + SaNtfStateChangeNotificationT ntfStateChange = { 0 }; + + SaAisErrorT rc(saNtfStateChangeNotificationAllocate(cb->ntfHandle, + &ntfStateChange, + 1, + 0, + 0, + 1, + 0)); + + if (rc != SA_AIS_OK) { + LOG_ER("saNtfStateChangeNotificationAllocate " "failed: %i", rc); + break; + } + + *ntfStateChange.notificationHeader.eventType = SA_NTF_OBJECT_STATE_CHANGE; + + memcpy(ntfStateChange.notificationHeader.notificationObject, + &queueGroup, + sizeof(SaNameT)); + + memcpy(ntfStateChange.notificationHeader.notifyingObject, + ¬ifyingObject, + sizeof(SaNameT)); + + ntfStateChange.notificationHeader.notificationClassId->vendorId = + SA_NTF_VENDOR_ID_SAF; + ntfStateChange.notificationHeader.notificationClassId->majorId = + SA_SVC_MSG; + ntfStateChange.notificationHeader.notificationClassId->minorId = + (status == SA_MSG_QUEUE_GROUP_CAPACITY_REACHED) ? 0x67 : 0x68; + + *ntfStateChange.sourceIndicator = SA_NTF_OBJECT_OPERATION; + + ntfStateChange.changedStates[0].stateId = SA_MSG_DEST_CAPACITY_STATUS; + + ntfStateChange.changedStates[0].oldStatePresent = + (status == SA_MSG_QUEUE_GROUP_CAPACITY_REACHED) ? SA_FALSE : SA_TRUE; + + if (status == SA_MSG_QUEUE_GROUP_CAPACITY_AVAILABLE) { + ntfStateChange.changedStates[0].oldState = + SA_MSG_QUEUE_GROUP_CAPACITY_REACHED; + } + + ntfStateChange.changedStates[0].newState = status; + + rc = saNtfNotificationSend(ntfStateChange.notificationHandle); + + if (rc != SA_AIS_OK) + LOG_ER("saNtfNotificationSend failed: %i", rc); + + rc = saNtfNotificationFree(ntfStateChange.notificationHandle); + + if (rc != SA_AIS_OK) + LOG_ER("saNtfNotificationFree failed: %i", rc); + } while (false); + + TRACE_LEAVE(); +} + +static void updateQueueGroup(MQD_CB *cb, + const SaNameT& queueName, + SaUint16T status) { + TRACE_ENTER(); + + bool member(false), allFull(true); + typedef std::vector<SaNameT> GroupList; + GroupList groupList; + + // maybe there is a better way to do this? + for (MQD_OBJ_NODE *objNode(reinterpret_cast<MQD_OBJ_NODE *> + (ncs_patricia_tree_getnext(&cb->qdb, 0))); + objNode; + objNode = reinterpret_cast<MQD_OBJ_NODE *>(ncs_patricia_tree_getnext( + &cb->qdb, + reinterpret_cast<uint8_t *>(&objNode->oinfo.name)))) { + + if (objNode->oinfo.type == MQSV_OBJ_QGROUP) { + NCS_Q_ITR itr = { 0 }; + + for (MQD_OBJECT_ELEM *objElem(static_cast<MQD_OBJECT_ELEM *> + (ncs_queue_get_next(&objNode->oinfo.ilist, &itr))); + objElem; + objElem = static_cast<MQD_OBJECT_ELEM *> + (ncs_queue_get_next(&objNode->oinfo.ilist, &itr))) { + if (objElem->pObject->type == MQSV_OBJ_QUEUE) { + if (queueName.length == objElem->pObject->name.length && + !memcmp(queueName.value, + objElem->pObject->name.value, + queueName.length)) { + member = true; + if (status == SA_MSG_QUEUE_CAPACITY_REACHED) + objElem->pObject->capacityReached = true; + else if (status == SA_MSG_QUEUE_CAPACITY_AVAILABLE) + objElem->pObject->capacityReached = false; + + groupList.push_back(objNode->oinfo.name); + } else if (!objElem->pObject->capacityReached) { + allFull = false; + } + } + } + } + } + + if (member && ((status == SA_MSG_QUEUE_CAPACITY_REACHED && allFull) || + (status == SA_MSG_QUEUE_CAPACITY_AVAILABLE && allFull))) { + for (auto& it : groupList) + sendStateChangeNotification(cb, it, status); + } else { + // don't care about anything else + } + + TRACE_LEAVE(); +} + +static bool isMSGStateChange(const SaNtfClassIdT &classId) { + TRACE_ENTER(); + bool status(false); + + if (classId.vendorId == SA_NTF_VENDOR_ID_SAF && + classId.majorId == SA_SVC_MSG) { + status = true; + } + + TRACE_LEAVE2("%i", status); + return status; +} + +static void handleMsgObjectStateChangeNotification( + MQD_CB *cb, + const SaNtfStateChangeNotificationT& stateChangeNotification) { + TRACE_ENTER(); + + do { + if (*stateChangeNotification.sourceIndicator != SA_NTF_OBJECT_OPERATION) { + LOG_ER("expected object operation -- got %i", + *stateChangeNotification.sourceIndicator); + break; + } + + if (stateChangeNotification.numStateChanges != 1) { + LOG_ER("expected one state change -- got %i", + stateChangeNotification.numStateChanges); + break; + } + + if (stateChangeNotification.changedStates[0].stateId != + SA_MSG_DEST_CAPACITY_STATUS) { + LOG_ER("unknown state id: %i", + stateChangeNotification.changedStates[0].stateId); + break; + } + + // if all the queues for a group are full then send a notification + updateQueueGroup( + cb, + *stateChangeNotification.notificationHeader.notificationObject, + stateChangeNotification.changedStates[0].newState); + } while (false); + + TRACE_LEAVE(); +} + +static void handleStateChangeNotification( + MQD_CB *cb, + const SaNtfStateChangeNotificationT& stateChangeNotification) { + TRACE_ENTER(); + + if (stateChangeNotification.notificationHeader.notificationClassId && + isMSGStateChange( + *stateChangeNotification.notificationHeader.notificationClassId)) { + handleMsgObjectStateChangeNotification(cb, stateChangeNotification); + } else { + TRACE("ignoring non-MSG state change notification"); + } + + TRACE_LEAVE(); +} + +void mqdNtfNotificationCallback(SaNtfSubscriptionIdT subscriptionId, + const SaNtfNotificationsT *notification) { + TRACE_ENTER(); + MQD_CB *cb(static_cast<MQD_CB *>(ncshm_take_hdl(NCS_SERVICE_ID_MQD, + gl_mqdinfo.inst_hdl))); + + if (!cb) { + LOG_ER("can't get mqd control block"); + } + + do { + if (cb->ha_state != SA_AMF_HA_ACTIVE) + break; + + if (subscriptionId != operStateSubId) { + TRACE("unknown subscription id received in mqdNtfNotificationCallback: " + "%d", + subscriptionId); + break; + } + + if (notification->notificationType == SA_NTF_TYPE_STATE_CHANGE) { + handleStateChangeNotification( + cb, + notification->notification.stateChangeNotification); + } else { + TRACE("ignoring NTF notification of type: %i", + notification->notificationType); + } + } while (false); + + ncshm_give_hdl(cb->hdl); + + TRACE_LEAVE(); +} + +SaAisErrorT mqdInitNtfSubscriptions(SaNtfHandleT ntfHandle) { + TRACE_ENTER(); + SaAisErrorT rc(SA_AIS_OK); + + do { + SaNtfStateChangeNotificationFilterT filter; + + rc = saNtfStateChangeNotificationFilterAllocate( + ntfHandle, + &filter, 0, 0, 0, 0, 0, 0); + + if (rc != SA_AIS_OK) { + LOG_ER("saNtfAttributeChangeNotificationFilterAllocate" + " failed: %i", + rc); + break; + } + + SaNtfNotificationTypeFilterHandlesT filterHandles = { + 0, 0, filter.notificationFilterHandle, 0, 0 + }; + + rc = saNtfNotificationSubscribe(&filterHandles, + operStateSubId); + + if (rc != SA_AIS_OK) { + LOG_ER("saNtfNotificationSubscribe failed: %i", rc); + break; + } + + rc = saNtfNotificationFilterFree(filter.notificationFilterHandle); + + if (rc != SA_AIS_OK) { + LOG_ER("saNtfNotificationFilterFree failed: %i", rc); + break; + } + } while (false); + + TRACE_LEAVE(); + return rc; +} diff --git a/src/msg/msgd/mqd_ntf.h b/src/msg/msgd/mqd_ntf.h new file mode 100644 index 0000000..4d9c0f1 --- /dev/null +++ b/src/msg/msgd/mqd_ntf.h @@ -0,0 +1,49 @@ +/* -*- OpenSAF -*- + * + * (C) Copyright 2017 The OpenSAF Foundation + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed + * under the GNU Lesser General Public License Version 2.1, February 1999. + * The complete license can be accessed from the following location: + * https://urldefense.proofpoint.com/v2/url?u=http-3A__opensource.org_licenses_lgpl-2Dlicense.php&d=DwIF-g&c=RoP1YumCXCgaWHvlZYR8PZh8Bv7qIrMUB65eapI_JnE&r=rU6x356sikQZSi7Ttc2DuiqAgbc0QIeANg72N5AllVc&m=zb33_QGOVBVMiOqaTZ4TDnD1K3tft2hmzm4Zl_-Ak9E&s=zW1Da85bMtBUT-FnbCAff4Qx8AxM9-IibHRtx7PHWNs&e= + * See the Copying file included with the OpenSAF distribution for full + * licensing terms. + * + * Author(s): Genband + * + */ + +/***************************************************************************** +.............................................................................. + + FILE NAME: mqd_ntf.h + +.............................................................................. + + DESCRIPTION: + + MQD Ntf Structures & Parameters. + +******************************************************************************/ + +#ifndef MSG_MSGD_MQD_NTF_H_ +#define MSG_MSGD_MQD_NTF_H_ + +#include <saNtf.h> + +#ifdef __cplusplus +extern "C" { +#endif + +void mqdNtfNotificationCallback(SaNtfSubscriptionIdT subscriptionId, + const SaNtfNotificationsT *notification); + +SaAisErrorT mqdInitNtfSubscriptions(SaNtfHandleT); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/msg/msgnd/mqnd_db.h b/src/msg/msgnd/mqnd_db.h index d04566e..fc7d926 100644 --- a/src/msg/msgnd/mqnd_db.h +++ b/src/msg/msgnd/mqnd_db.h @@ -74,6 +74,7 @@ typedef struct mqnd_queue_info { SaMsgQueueHandleT listenerHandle; /* Listener queue handle */ SaNameT queueName; SaSizeT size[SA_MSG_MESSAGE_LOWEST_PRIORITY + 1]; /* Size of the queue */ + SaMsgQueueThresholdsT thresholds; SaMsgQueueStatusT queueStatus; /* Status info of the queue */ SaMsgQueueSendingStateT sendingState; /* Sending state is removed from B.1.1, but used internally */ @@ -91,6 +92,8 @@ typedef struct mqnd_queue_info { uint32_t numberOfFullErrors[SA_MSG_MESSAGE_LOWEST_PRIORITY + 1]; SaUint64T totalQueueSize; uint32_t shm_queue_index; + bool capacityReachedSent; + bool capacityAvailableSent; } MQND_QUEUE_INFO; typedef struct mqnd_qhndl_node { @@ -127,6 +130,7 @@ typedef struct mqnd_queue_ckpt_info { SaMsgQueueHandleT listenerHandle; /* Listener queue handle */ SaNameT queueName; SaSizeT size[SA_MSG_MESSAGE_LOWEST_PRIORITY + 1]; /* Size of the queue */ + SaMsgQueueThresholdsT thresholds; SaMsgQueueCreationFlagsT creationFlags; SaTimeT creationTime; /* Queue Creation time */ SaTimeT retentionTime; @@ -145,6 +149,8 @@ typedef struct mqnd_queue_ckpt_info { QueueStatsShm; /* Structure to store queue stats in shared memory */ MQND_TMR qtransfer_complete_tmr; /* Q Transfer Complete Timer */ uint32_t valid; /* To verify whether the checkpoint info is valid or not */ + bool capacityReachedSent; + bool capacityAvailableSent; } MQND_QUEUE_CKPT_INFO; typedef struct mqnd_shm_info { diff --git a/src/msg/msgnd/mqnd_evt.c b/src/msg/msgnd/mqnd_evt.c index 0c2a612..fcbe4d6 100644 --- a/src/msg/msgnd/mqnd_evt.c +++ b/src/msg/msgnd/mqnd_evt.c @@ -42,6 +42,8 @@ static uint32_t mqnd_evt_proc_update_stats_shm(MQND_CB *cb, MQSV_DSEND_EVT *evt); static uint32_t mqnd_evt_proc_cb_dump(void); static uint32_t mqnd_evt_proc_ret_time_set(MQND_CB *cb, MQSV_EVT *evt); +static uint32_t mqnd_evt_proc_cap_set(MQND_CB *, MQSV_EVT *); +static uint32_t mqnd_evt_proc_cap_get(MQND_CB *, MQSV_EVT *); static void mqnd_dump_queue_status(MQND_CB *cb, SaMsgQueueStatusT *queueStatus, uint32_t offset); static void mqnd_dump_timer_info(MQND_TMR tmr); @@ -186,6 +188,12 @@ static uint32_t mqnd_proc_mqp_req_msg(MQND_CB *cb, MQSV_EVT *evt) case MQP_EVT_Q_RET_TIME_SET_REQ: rc = mqnd_evt_proc_ret_time_set(cb, evt); break; + case MQP_EVT_CAP_SET_REQ: + rc = mqnd_evt_proc_cap_set(cb, evt); + break; + case MQP_EVT_CAP_GET_REQ: + rc = mqnd_evt_proc_cap_get(cb, evt); + break; default: LOG_ER("%s:%u: unrecognized message type: %d", __FILE__, __LINE__, evt->msg.mqp_req.type); @@ -946,6 +954,191 @@ send_rsp: } /**************************************************************************** + * Name : sendStateChangeNotification + * + * Description : Send state change notification for queues + * + * Arguments : MQND_CB *cb - MQND CB pointer + * MQND_QUEUE_INFO *qInfo - queue info struct + * SaMsgMessageCapacityStatusT - status to send + * + * Return Values : None. + * + * Notes : None. + *****************************************************************************/ +static void sendStateChangeNotification(MQND_CB *cb, + MQND_QUEUE_INFO *qInfo, + SaMsgMessageCapacityStatusT status) +{ + SaNtfHandleT ntfHandle = 0; + + do { + SaVersionT ntfVersion = { 'A', 1, 1 }; + + /* do we need to send it? */ + if ((status == SA_MSG_QUEUE_CAPACITY_REACHED && + qInfo->capacityReachedSent) || + (status == SA_MSG_QUEUE_CAPACITY_AVAILABLE && + qInfo->capacityAvailableSent)) { + break; + } + + SaAisErrorT rc = saNtfInitialize(&ntfHandle, 0, &ntfVersion); + + if (rc != SA_AIS_OK) { + LOG_ER("saNtfInitialize failed: %i", rc); + break; + } + + const SaNameT notifyingObject = { + sizeof("safApp=safMsgService") - 1, + "safApp=safMsgService" + }; + + SaNtfStateChangeNotificationT ntfStateChange = { 0 }; + + rc = saNtfStateChangeNotificationAllocate( + ntfHandle, + &ntfStateChange, + 1, /* numCorrelated Notifications */ + 0, /* length addition text */ + 0, /* num additional info */ + 1, /* num of state changes */ + 0); /* variable data size */ + + if (rc != SA_AIS_OK) { + LOG_ER("saNtfStateChangeNotificationAllocate " + "failed: %i", rc); + break; + } + + *ntfStateChange.notificationHeader.eventType = + SA_NTF_OBJECT_STATE_CHANGE; + + memcpy(ntfStateChange.notificationHeader.notificationObject, + &qInfo->queueName, + sizeof(SaNameT)); + + memcpy(ntfStateChange.notificationHeader.notifyingObject, + ¬ifyingObject, + sizeof(SaNameT)); + + ntfStateChange.notificationHeader.notificationClassId->vendorId = + SA_NTF_VENDOR_ID_SAF; + ntfStateChange.notificationHeader.notificationClassId->majorId = + SA_SVC_MSG; + ntfStateChange.notificationHeader.notificationClassId->minorId = + (status == SA_MSG_QUEUE_CAPACITY_REACHED) ? + 0x65 : 0x66; + + *ntfStateChange.sourceIndicator = SA_NTF_OBJECT_OPERATION; + + ntfStateChange.changedStates[0].stateId = + SA_MSG_DEST_CAPACITY_STATUS; + + ntfStateChange.changedStates[0].oldStatePresent = + (status == SA_MSG_QUEUE_CAPACITY_REACHED) ? + SA_FALSE : SA_TRUE; + + if (status == SA_MSG_QUEUE_CAPACITY_AVAILABLE) { + ntfStateChange.changedStates[0].oldState = + SA_MSG_QUEUE_CAPACITY_REACHED; + } + + ntfStateChange.changedStates[0].newState = status; + + rc = saNtfNotificationSend(ntfStateChange.notificationHandle); + + if (rc != SA_AIS_OK) + LOG_ER("saNtfNotificationSend failed: %i", rc); + else { + if (status == SA_MSG_QUEUE_CAPACITY_REACHED) { + qInfo->capacityReachedSent = true; + qInfo->capacityAvailableSent = false; + } else { + qInfo->capacityReachedSent = false; + qInfo->capacityAvailableSent = true; + } + } + + rc = saNtfNotificationFree(ntfStateChange.notificationHandle); + + if (rc != SA_AIS_OK) + LOG_ER("saNtfNotificationFree failed: %i", rc); + } while (false); + + if (ntfHandle) { + SaAisErrorT rc = saNtfFinalize(ntfHandle); + + if (rc != SA_AIS_OK) + LOG_ER("saNtfFinalize failed: %i", rc); + } +} + +/**************************************************************************** + * Name : checkCapacity + * + * Description : Function to check whether we need to send state change + * notification. + * + * Arguments : MQND_CB *cb - MQND CB pointer + * MQND_QUEUE_INFO *qInfo - queue info + * + * Return Values : None. + * + * Notes : None. + *****************************************************************************/ +static void checkCapacity(MQND_CB *cb, MQND_QUEUE_INFO *qInfo) +{ + int i; + uint32_t offset = qInfo->shm_queue_index; + MQND_QUEUE_CKPT_INFO *shm_base_addr = cb->mqnd_shm.shm_base_addr; + bool sendNotification = true; + SaMsgMessageCapacityStatusT capacityType = + SA_MSG_QUEUE_CAPACITY_REACHED; + + TRACE_ENTER(); + + /* send capacity notifications */ + for (i = SA_MSG_MESSAGE_HIGHEST_PRIORITY; + i <= SA_MSG_MESSAGE_LOWEST_PRIORITY; + i++) + { + TRACE("capacityReached[%i]: %llu capacityAvailable: %llu " + "queueUsed: %llu", + i, + qInfo->thresholds.capacityReached[i], + qInfo->thresholds.capacityAvailable[i], + shm_base_addr[offset].QueueStatsShm. + saMsgQueueUsage[i].queueUsed); + if (!qInfo->capacityReachedSent && + qInfo->thresholds.capacityReached[i] >= + shm_base_addr[offset].QueueStatsShm. + saMsgQueueUsage[i].queueUsed) { + /* only send notification if all are full */ + sendNotification = false; + TRACE("not sending notification"); + break; + } else if (!qInfo->capacityAvailableSent && + qInfo->thresholds.capacityAvailable[i] > + shm_base_addr[offset].QueueStatsShm. + saMsgQueueUsage[i].queueUsed) { + /* send notification if at least one comes available */ + TRACE("sending available notification"); + sendNotification = true; + capacityType = SA_MSG_QUEUE_CAPACITY_AVAILABLE; + break; + } + } + + TRACE("sendNotification: %i", sendNotification); + if (sendNotification) + sendStateChangeNotification(cb, qInfo, capacityType); + + TRACE_LEAVE(); +} + +/**************************************************************************** * Name : mqnd_evt_proc_update_stats_shm * * Description : Function to update stats of queue in shm when message is @@ -998,7 +1191,10 @@ static uint32_t mqnd_evt_proc_update_stats_shm(MQND_CB *cb, MQSV_DSEND_EVT *evt) statsReq = &evt->info.statsReq; - if (cb->is_restart_done) + if (!cb->clm_node_joined) { + err = SA_AIS_ERR_UNAVAILABLE; + goto done; + } else if (cb->is_restart_done) err = SA_AIS_OK; else { err = SA_AIS_ERR_TRY_AGAIN; @@ -1039,6 +1235,8 @@ static uint32_t mqnd_evt_proc_update_stats_shm(MQND_CB *cb, MQSV_DSEND_EVT *evt) goto done; } + checkCapacity(cb, &qnode->qinfo); + done: direct_rsp_evt = @@ -1292,6 +1490,9 @@ static uint32_t mqnd_evt_proc_send_msg(MQND_CB *cb, MQSV_DSEND_EVT *evt) mqnd_send_msg_update_stats_shm(cb, qnode, snd_msg->message.size, snd_msg->message.priority); + + checkCapacity(cb, &qnode->qinfo); + send_resp: /* If the error happens in SendReceive case while sending the message then return the response from here and don't wait for the reply @@ -1741,6 +1942,162 @@ send_rsp: } /**************************************************************************** + * Name : mqnd_evt_proc_cap_set + * + * Description : Function to set capacity thresholds of queue + * + * Arguments : + * + * Return Values : NCSCC_RC_SUCCESS/Error. + * + * Notes : None. + *****************************************************************************/ +static uint32_t mqnd_evt_proc_cap_set(MQND_CB *cb, MQSV_EVT *evt) +{ + SaAisErrorT err = SA_AIS_OK; + MQSV_EVT rsp_evt; + uint32_t rc = NCSCC_RC_SUCCESS; + TRACE_ENTER(); + + do { + MQND_QUEUE_NODE *qnode = NULL; + int i; + + if (!cb->clm_node_joined) { + err = SA_AIS_ERR_UNAVAILABLE; + break; + } else if (cb->is_restart_done) { + err = SA_AIS_OK; + } else { + LOG_ER("%s:%u: ERR_TRY_AGAIN: MQND is not completely " + "Initialized", + __FILE__, __LINE__); + err = SA_AIS_ERR_TRY_AGAIN; + break; + } + + mqnd_queue_node_get(cb, + evt->msg.mqp_req.info.retTimeSetReq.queueHandle, + &qnode); + + /* If queue not found */ + if (!qnode) { + LOG_ER("ERR_BAD_HANDLE: Get queue node Failed"); + err = SA_AIS_ERR_BAD_HANDLE; + break; + } + + /* + * capacityAvailable and capacityReached have already been + * checked in the agent with regards to each other + */ + for (i = SA_MSG_MESSAGE_HIGHEST_PRIORITY; + i <= SA_MSG_MESSAGE_LOWEST_PRIORITY; + i++) + { + if (qnode->qinfo.size[i] < + evt->msg.mqp_req.info.capacity.thresholds.capacityReached[i]) { + TRACE("size is less than capacity reached"); + err = SA_AIS_ERR_INVALID_PARAM; + break; + } + } + + if (err != SA_AIS_OK) + break; + + qnode->qinfo.thresholds = + evt->msg.mqp_req.info.capacity.thresholds; + } while (false); + + /*Send the resp to MQA */ + memset(&rsp_evt, 0, sizeof(MQSV_EVT)); + + rsp_evt.type = MQSV_EVT_MQP_RSP; + rsp_evt.msg.mqp_rsp.type = MQP_EVT_CAP_SET_RSP; + rsp_evt.msg.mqp_rsp.error = err; + + rc = mqnd_mds_send_rsp(cb, &evt->sinfo, &rsp_evt); + + if (rc != NCSCC_RC_SUCCESS) + LOG_ER( + "Queue Capacity set :Mds Send Response Failed %" PRIx64, + evt->sinfo.dest); + else + TRACE_1("Queue Capacity set: Mds Send Response Success"); + + TRACE_LEAVE2("Returned with return code %u", rc); + return rc; +} + +/**************************************************************************** + * Name : mqnd_evt_proc_cap_get + * + * Description : Function to get capacity thresholds of queue + * + * Arguments : + * + * Return Values : NCSCC_RC_SUCCESS/Error. + * + * Notes : None. + *****************************************************************************/ +static uint32_t mqnd_evt_proc_cap_get(MQND_CB *cb, MQSV_EVT *evt) +{ + SaAisErrorT err = SA_AIS_OK; + MQSV_EVT rsp_evt; + MQND_QUEUE_NODE *qnode = NULL; + uint32_t rc = NCSCC_RC_SUCCESS; + + TRACE_ENTER(); + + do { + if (!cb->clm_node_joined) { + err = SA_AIS_ERR_UNAVAILABLE; + break; + } else if (cb->is_restart_done) { + err = SA_AIS_OK; + } else { + LOG_ER("%s:%u: ERR_TRY_AGAIN: MQND is not completely " + "Initialized", + __FILE__, __LINE__); + err = SA_AIS_ERR_TRY_AGAIN; + break; + } + + mqnd_queue_node_get(cb, + evt->msg.mqp_req.info.retTimeSetReq.queueHandle, + &qnode); + + /* If queue not found */ + if (!qnode) { + LOG_ER("ERR_BAD_HANDLE: Get queue node Failed"); + err = SA_AIS_ERR_BAD_HANDLE; + break; + } + } while (false); + + /*Send the resp to MQA */ + memset(&rsp_evt, 0, sizeof(MQSV_EVT)); + + rsp_evt.type = MQSV_EVT_MQP_RSP; + rsp_evt.msg.mqp_rsp.type = MQP_EVT_CAP_GET_RSP; + rsp_evt.msg.mqp_rsp.error = err; + rsp_evt.msg.mqp_rsp.info.capacity.thresholds = qnode->qinfo.thresholds; + + rc = mqnd_mds_send_rsp(cb, &evt->sinfo, &rsp_evt); + + if (rc != NCSCC_RC_SUCCESS) + LOG_ER( + "Queue Capacity get :Mds Send Response Failed %" PRIx64, + evt->sinfo.dest); + else + TRACE_1("Queue Capacity get: Mds Send Response Success"); + + TRACE_LEAVE2("Returned with return code %u", rc); + return rc; +} + +/**************************************************************************** * Name : mqnd_evt_proc_cb_dump * * Description : Function to print the control block infomration diff --git a/src/msg/msgnd/mqnd_imm.c b/src/msg/msgnd/mqnd_imm.c index 349790d..feb4aa4 100644 --- a/src/msg/msgnd/mqnd_imm.c +++ b/src/msg/msgnd/mqnd_imm.c @@ -460,7 +460,6 @@ SaAisErrorT mqnd_create_runtime_MsgQPriorityobject(SaStringT rname, SaImmOiHandleT immOiHandle) { SaAisErrorT rc = SA_AIS_OK; - SaUint64T def_val = 0; int i = 0; SaImmAttrValueT arr1[1], arr2[1], arr3[1], arr4[1]; SaImmAttrValuesT_2 attr_mqprio, attr_mqprioSize, attr_capavail, @@ -484,8 +483,8 @@ SaAisErrorT mqnd_create_runtime_MsgQPriorityobject(SaStringT rname, arr1[0] = &mqprdn; arr2[0] = &(qnode->qinfo.size[i]); - arr3[0] = &def_val; /* not implemented */ - arr4[0] = &def_val; /* not implemented */ + arr3[0] = &qnode->qinfo.thresholds.capacityAvailable[i]; + arr4[0] = &qnode->qinfo.thresholds.capacityReached[i]; attr_mqprio.attrName = "safMqPrio"; attr_mqprio.attrValueType = SA_IMM_ATTR_SASTRINGT; diff --git a/src/msg/msgnd/mqnd_util.c b/src/msg/msgnd/mqnd_util.c index 8dd79fb..10c78f5 100644 --- a/src/msg/msgnd/mqnd_util.c +++ b/src/msg/msgnd/mqnd_util.c @@ -79,6 +79,10 @@ uint32_t mqnd_queue_create(MQND_CB *cb, MQP_OPEN_REQ *open, MDS_DEST *rcvr_mqa, i <= SA_MSG_MESSAGE_LOWEST_PRIORITY; i++) { qnode->qinfo.totalQueueSize += open->creationAttributes.size[i]; qnode->qinfo.size[i] = open->creationAttributes.size[i]; + qnode->qinfo.thresholds.capacityReached[i] = + qnode->qinfo.size[i]; + qnode->qinfo.thresholds.capacityAvailable[i] = + qnode->qinfo.size[i]; qnode->qinfo.queueStatus.saMsgQueueUsage[i].queueSize = open->creationAttributes.size[i]; } -- 2.9.5 ------------------------------------------------------------------------------ Check out the vibrant tech community on one of the world's most engaging tech sites, Slashdot.org! http://sdm.link/slashdot _______________________________________________ Opensaf-devel mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/opensaf-devel
