This commit implements Sections 3.4.12 and 6.2.2 of the B.03.01 MSG spec.
---
src/msg/Makefile.am | 8 +-
src/msg/agent/mqa_api.c | 330 +++++++++
src/msg/apitest/test_CapacityThresholds.cc | 1049 ++++++++++++++++++++++++++++
src/msg/apitest/test_ErrUnavailable.cc | 110 ++-
src/msg/common/mqsv_edu.c | 74 +-
src/msg/common/mqsv_evt.h | 15 +-
src/msg/msgd/mqd_api.c | 64 +-
src/msg/msgd/mqd_db.h | 4 +
src/msg/msgd/mqd_ntf.cc | 310 ++++++++
src/msg/msgd/mqd_ntf.h | 49 ++
src/msg/msgnd/mqnd_db.h | 6 +
src/msg/msgnd/mqnd_evt.c | 359 +++++++++-
src/msg/msgnd/mqnd_imm.c | 5 +-
src/msg/msgnd/mqnd_util.c | 4 +
14 files changed, 2374 insertions(+), 13 deletions(-)
create mode 100644 src/msg/apitest/test_CapacityThresholds.cc
create mode 100644 src/msg/msgd/mqd_ntf.cc
create mode 100644 src/msg/msgd/mqd_ntf.h
diff --git a/src/msg/Makefile.am b/src/msg/Makefile.am
index aeec6ca..c308072 100644
--- a/src/msg/Makefile.am
+++ b/src/msg/Makefile.am
@@ -97,6 +97,7 @@ noinst_HEADERS += \
src/msg/msgd/mqd_dl_api.h \
src/msg/msgd/mqd_imm.h \
src/msg/msgd/mqd_mem.h \
+ src/msg/msgd/mqd_ntf.h \
src/msg/msgd/mqd_red.h \
src/msg/msgd/mqd_saf.h \
src/msg/msgd/mqd_tmr.h \
@@ -158,6 +159,7 @@ bin_osafmsgnd_LDADD = \
lib/libmsg_common.la \
lib/libSaAmf.la \
lib/libSaClm.la \
+ lib/libSaNtf.la \
lib/libosaf_common.la \
lib/libSaImmOi.la \
lib/libSaImmOm.la \
@@ -182,7 +184,8 @@ bin_osafmsgd_SOURCES = \
src/msg/msgd/mqd_mds.c \
src/msg/msgd/mqd_red.c \
src/msg/msgd/mqd_saf.c \
- src/msg/msgd/mqd_tmr.c
+ src/msg/msgd/mqd_tmr.c \
+ src/msg/msgd/mqd_ntf.cc
bin_osafmsgd_LDADD = \
lib/libmsg_common.la \
@@ -191,6 +194,7 @@ bin_osafmsgd_LDADD = \
lib/libosaf_common.la \
lib/libSaImmOi.la \
lib/libSaImmOm.la \
+ lib/libSaNtf.la \
lib/libopensaf_core.la
if ENABLE_TESTS
@@ -205,6 +209,7 @@ noinst_HEADERS += \
bin_msgtest_SOURCES = \
src/msg/apitest/msgtest.c \
src/msg/apitest/test_saMsgVersionT.cc \
+ src/msg/apitest/test_CapacityThresholds.cc \
src/msg/apitest/test_ErrUnavailable.cc \
src/msg/apitest/tet_mqa_conf.c \
src/msg/apitest/tet_mqsv_util.c \
@@ -212,6 +217,7 @@ bin_msgtest_SOURCES = \
bin_msgtest_LDADD = \
lib/libSaMsg.la \
+ lib/libSaNtf.la \
lib/libopensaf_core.la \
lib/libapitest.la
diff --git a/src/msg/agent/mqa_api.c b/src/msg/agent/mqa_api.c
index eeec00e..0070aaf 100644
--- a/src/msg/agent/mqa_api.c
+++ b/src/msg/agent/mqa_api.c
@@ -5600,6 +5600,336 @@ done:
return rc;
}
+/****************************************************************************
+ Name : saMsgQueueCapacityThresholdsSet
+
+ Description : This routine sets the critical capacity thresholds for the
+ queue.
+
+ Arguments : SaMsgQueueHandleT queueHandle
+ : const SaMsgQueueThresholdsT *thresholds
+
+ Return Values : SaAisErrorT
+
+ Notes : None
+******************************************************************************/
+SaAisErrorT saMsgQueueCapacityThresholdsSet(SaMsgQueueHandleT queueHandle,
+ const SaMsgQueueThresholdsT *thresholds)
+{
+ SaAisErrorT rc = SA_AIS_OK;
+ MQA_CB *mqa_cb;
+ bool locked = false;
+
+ do {
+ MQSV_EVT cap_evt;
+ MQSV_EVT *out_evt = 0;
+ MQA_QUEUE_INFO *queue_node;
+ uint8_t mds_rc;
+ int i;
+
+ TRACE_ENTER2("SaMsgQueueHandle %llu ", queueHandle);
+
+ /* retrieve MQA CB */
+ mqa_cb = (MQA_CB *)m_MQSV_MQA_RETRIEVE_MQA_CB;
+ if (!mqa_cb) {
+ TRACE_2("ERR_BAD_HANDLE: Control block retrieval "
+ "failed");
+ rc = SA_AIS_ERR_BAD_HANDLE;
+ break;
+ }
+
+ if (m_NCS_LOCK(&mqa_cb->cb_lock, NCS_LOCK_WRITE) !=
+ NCSCC_RC_SUCCESS) {
+ TRACE_4("ERR_LIBRARY: Lock failed for control block "
+ "write");
+ rc = SA_AIS_ERR_LIBRARY;
+ break;
+ }
+
+ locked = true;
+
+ /* Check if queueHandle is present in the tree */
+ if ((queue_node = mqa_queue_tree_find_and_add(
+ mqa_cb, queueHandle, false, NULL, 0)) == NULL) {
+ TRACE_2("ERR_BAD_HANDLE: Queue Database Find Failed");
+ rc = SA_AIS_ERR_BAD_HANDLE;
+ break;
+ }
+
+ if (queue_node->client_info->version.majorVersion <
+ MQA_MAJOR_VERSION) {
+ TRACE_2("ERR_VERSION: client not B.03.01");
+ rc = SA_AIS_ERR_VERSION;
+ break;
+ }
+
+ if (queue_node->client_info->version.majorVersion ==
+ MQA_MAJOR_VERSION) {
+ if (!mqa_cb->clm_node_joined ||
+ queue_node->client_info->isStale) {
+ TRACE_2("ERR_UNAVAILABLE: node is not cluster "
+ "member");
+ rc = SA_AIS_ERR_UNAVAILABLE;
+ break;
+ }
+ }
+
+ if (!thresholds) {
+ TRACE_2("ERR_INVALID_PARAM: thresholds is 0");
+ rc = SA_AIS_ERR_INVALID_PARAM;
+ break;
+ }
+
+ for (i = SA_MSG_MESSAGE_HIGHEST_PRIORITY;
+ i <= SA_MSG_MESSAGE_LOWEST_PRIORITY;
+ i++) {
+ if (thresholds->capacityAvailable[i] >
+ thresholds->capacityReached[i]) {
+ TRACE_2("ERR_INVALID_PARAM: Available greater "
+ "than Reached");
+ rc = SA_AIS_ERR_INVALID_PARAM;
+ break;
+ }
+ }
+
+ if (rc != SA_AIS_OK)
+ break;
+
+ /* Check if mqnd is up */
+ if (!mqa_cb->is_mqnd_up) {
+ TRACE_2("ERR_TRY_AGAIN: MQND is down");
+ rc = SA_AIS_ERR_TRY_AGAIN;
+ break;
+ }
+
+ /* populate the structure */
+ memset(&cap_evt, 0, sizeof(MQSV_EVT));
+ cap_evt.type = MQSV_EVT_MQP_REQ;
+ cap_evt.msg.mqp_req.type = MQP_EVT_CAP_SET_REQ;
+ cap_evt.msg.mqp_req.info.capacity.queueHandle = queueHandle;;
+ cap_evt.msg.mqp_req.info.capacity.thresholds = *thresholds;
+ cap_evt.msg.mqp_req.agent_mds_dest = mqa_cb->mqa_mds_dest;
+
+ /* send the request to the MQND */
+ mds_rc = mqa_mds_msg_sync_send(mqa_cb->mqa_mds_hdl,
+ &mqa_cb->mqnd_mds_dest,
+ &cap_evt,
+ &out_evt,
+ MQSV_WAIT_TIME);
+
+ switch (mds_rc) {
+ case NCSCC_RC_SUCCESS:
+ break;
+
+ case NCSCC_RC_REQ_TIMOUT:
+ TRACE_2("ERR_TIMEOUT: Message Send through MDS "
+ "Timeout %" PRIx64,
+ mqa_cb->mqa_mds_dest);
+ rc = SA_AIS_ERR_TIMEOUT;
+ break;
+
+ case NCSCC_RC_FAILURE:
+ TRACE_2("ERR_TRY_AGAIN: Message Send through "
+ "MDS Failure %" PRIx64,
+ mqa_cb->mqa_mds_dest);
+ rc = SA_AIS_ERR_TRY_AGAIN;
+ break;
+
+ default:
+ TRACE_4("ERR_RESOURCES: Message Send through "
+ "MDS Failure %" PRIx64,
+ mqa_cb->mqa_mds_dest);
+ rc = SA_AIS_ERR_NO_RESOURCES;
+ break;
+ }
+
+ if (mds_rc != NCSCC_RC_SUCCESS)
+ break;
+
+ if (out_evt) {
+ rc = out_evt->msg.mqp_rsp.error;
+ m_MMGR_FREE_MQA_EVT(out_evt);
+ } else {
+ TRACE_4("ERR_RESOURCES: Response not received from "
+ "MQND");
+ rc = SA_AIS_ERR_NO_RESOURCES;
+ }
+ } while (false);
+
+ if (locked)
+ m_NCS_UNLOCK(&mqa_cb->cb_lock, NCS_LOCK_WRITE);
+
+ if (mqa_cb)
+ m_MQSV_MQA_GIVEUP_MQA_CB;
+
+ if (rc == SA_AIS_OK) {
+ TRACE_LEAVE2(" Success ");
+ } else {
+ TRACE_LEAVE2(" Failed with return code - %d ", rc);
+ }
+
+ return rc;
+}
+
+/****************************************************************************
+ Name : saMsgQueueCapacityThresholdsGet
+
+ Description : This routine gets the critical capacity thresholds for the
+ queue.
+
+ Arguments : SaMsgQueueHandleT queueHandle
+ : SaMsgQueueThresholdsT *thresholds
+
+ Return Values : SaAisErrorT
+
+ Notes : None
+******************************************************************************/
+SaAisErrorT saMsgQueueCapacityThresholdsGet(SaMsgQueueHandleT queueHandle,
+ SaMsgQueueThresholdsT *thresholds)
+{
+ SaAisErrorT rc = SA_AIS_OK;
+ MQA_CB *mqa_cb;
+ bool locked = false;
+
+ do {
+ MQSV_EVT cap_evt;
+ MQSV_EVT *out_evt = 0;
+ MQA_QUEUE_INFO *queue_node;
+ uint8_t mds_rc;
+
+ TRACE_ENTER2("SaMsgQueueHandle %llu ", queueHandle);
+
+ /* retrieve MQA CB */
+ mqa_cb = (MQA_CB *)m_MQSV_MQA_RETRIEVE_MQA_CB;
+ if (!mqa_cb) {
+ TRACE_2("ERR_BAD_HANDLE: Control block retrieval "
+ "failed");
+ rc = SA_AIS_ERR_BAD_HANDLE;
+ break;
+ }
+
+ if (m_NCS_LOCK(&mqa_cb->cb_lock, NCS_LOCK_WRITE) !=
+ NCSCC_RC_SUCCESS) {
+ TRACE_4("ERR_LIBRARY: Lock failed for control block "
+ "write");
+ rc = SA_AIS_ERR_LIBRARY;
+ break;
+ }
+
+ locked = true;
+
+ /* Check if queueHandle is present in the tree */
+ if ((queue_node = mqa_queue_tree_find_and_add(
+ mqa_cb, queueHandle, false, NULL, 0)) == NULL) {
+ TRACE_2("ERR_BAD_HANDLE: Queue Database Find Failed");
+ rc = SA_AIS_ERR_BAD_HANDLE;
+ break;
+ }
+
+ if (queue_node->client_info->version.majorVersion <
+ MQA_MAJOR_VERSION) {
+ TRACE_2("ERR_VERSION: client not B.03.01");
+ rc = SA_AIS_ERR_VERSION;
+ break;
+ }
+
+ if (queue_node->client_info->version.majorVersion ==
+ MQA_MAJOR_VERSION) {
+ if (!mqa_cb->clm_node_joined ||
+ queue_node->client_info->isStale) {
+ TRACE_2("ERR_UNAVAILABLE: node is not cluster "
+ "member");
+ rc = SA_AIS_ERR_UNAVAILABLE;
+ break;
+ }
+ }
+
+ if (!thresholds) {
+ TRACE_2("ERR_INVALID_PARAM: thresholds is 0");
+ rc = SA_AIS_ERR_INVALID_PARAM;
+ break;
+ }
+
+ /* Check if mqnd is up */
+ if (!mqa_cb->is_mqnd_up) {
+ TRACE_2("ERR_TRY_AGAIN: MQD or MQND is down");
+ rc = SA_AIS_ERR_TRY_AGAIN;
+ break;
+ }
+
+ /* populate the structure */
+ memset(&cap_evt, 0, sizeof(MQSV_EVT));
+ cap_evt.type = MQSV_EVT_MQP_REQ;
+ cap_evt.msg.mqp_req.type = MQP_EVT_CAP_GET_REQ;
+ cap_evt.msg.mqp_req.info.capacity.queueHandle = queueHandle;;
+ cap_evt.msg.mqp_req.agent_mds_dest = mqa_cb->mqa_mds_dest;
+
+ /* send the request to the MQND */
+ mds_rc = mqa_mds_msg_sync_send(mqa_cb->mqa_mds_hdl,
+ &mqa_cb->mqnd_mds_dest,
+ &cap_evt,
+ &out_evt,
+ MQSV_WAIT_TIME);
+
+ switch (mds_rc) {
+ case NCSCC_RC_SUCCESS:
+ break;
+
+ case NCSCC_RC_REQ_TIMOUT:
+ TRACE_2("ERR_TIMEOUT: Message Send through MDS "
+ "Timeout %" PRIx64,
+ mqa_cb->mqa_mds_dest);
+ rc = SA_AIS_ERR_TIMEOUT;
+ break;
+
+ case NCSCC_RC_FAILURE:
+ TRACE_2("ERR_TRY_AGAIN: Message Send through "
+ "MDS Failure %" PRIx64,
+ mqa_cb->mqa_mds_dest);
+ rc = SA_AIS_ERR_TRY_AGAIN;
+ break;
+
+ default:
+ TRACE_4("ERR_RESOURCES: Message Send through "
+ "MDS Failure %" PRIx64,
+ mqa_cb->mqa_mds_dest);
+ rc = SA_AIS_ERR_NO_RESOURCES;
+ break;
+ }
+
+ if (mds_rc != NCSCC_RC_SUCCESS)
+ break;
+
+ if (out_evt) {
+ rc = out_evt->msg.mqp_rsp.error;
+
+ if (rc == SA_AIS_OK) {
+ *thresholds = out_evt->msg.mqp_rsp.info.
+ capacity.thresholds;
+ }
+
+ m_MMGR_FREE_MQA_EVT(out_evt);
+ } else {
+ TRACE_4("ERR_RESOURCES: Response not received from "
+ "MQND");
+ rc = SA_AIS_ERR_NO_RESOURCES;
+ }
+ } while (false);
+
+ if (locked)
+ m_NCS_UNLOCK(&mqa_cb->cb_lock, NCS_LOCK_WRITE);
+
+ if (mqa_cb)
+ m_MQSV_MQA_GIVEUP_MQA_CB;
+
+ if (rc == SA_AIS_OK) {
+ TRACE_LEAVE2(" Success ");
+ } else {
+ TRACE_LEAVE2(" Failed with return code - %d ", rc);
+ }
+
+ return rc;
+}
/*****************************************************************************
Name : msgget_timer_expired
diff --git a/src/msg/apitest/test_CapacityThresholds.cc
b/src/msg/apitest/test_CapacityThresholds.cc
new file mode 100644
index 0000000..38d2144
--- /dev/null
+++ b/src/msg/apitest/test_CapacityThresholds.cc
@@ -0,0 +1,1049 @@
+#include <cassert>
+#include <condition_variable>
+#include <cstdio>
+#include <cstring>
+#include <iostream>
+#include <mutex>
+#include <queue>
+#include <thread>
+#include <sys/poll.h>
+#include "msg/apitest/msgtest.h"
+#include "msg/apitest/tet_mqsv.h"
+#include <saMsg.h>
+#include <saNtf.h>
+
+static SaVersionT msg3_1 = {'B', 3, 0};
+
+typedef std::queue<SaMsgMessageCapacityStatusT> TestQueue;
+typedef std::queue<int> ResultQueue;
+static TestQueue testQueue;
+static ResultQueue resultQueue;
+static std::mutex m;
+static std::condition_variable cv;
+static bool ready(false);
+static bool pollDone(false);
+
+static const SaNameT queueName = {
+ sizeof("safMq=TestQueue") - 1,
+ "safMq=TestQueue"
+};
+
+static const SaNameT queueGroupName = {
+ sizeof("safMqg=TestQueueGroup") - 1,
+ "safMqg=TestQueueGroup"
+};
+
+static const SaMsgQueueCreationAttributesT creationAttributes = {
+ 0,
+ { 5, 5, 5, 5 },
+ SA_TIME_ONE_SECOND
+};
+
+static SaMsgQueueHandleT openQueue(SaMsgHandleT msgHandle) {
+ SaMsgQueueHandleT queueHandle;
+
+ SaAisErrorT rc(saMsgQueueOpen(msgHandle,
+ &queueName,
+ &creationAttributes,
+ SA_MSG_QUEUE_CREATE,
+ SA_TIME_MAX,
+ &queueHandle));
+ assert(rc == SA_AIS_OK);
+
+ return queueHandle;
+}
+
+
+static void ntfCallback(SaNtfSubscriptionIdT subscriptionId,
+ const SaNtfNotificationsT *n) {
+ int asyncTestResult(TET_FAIL);
+ int asyncTest(testQueue.front());
+ testQueue.pop();
+
+ do {
+ if (n->notificationType != SA_NTF_TYPE_STATE_CHANGE) {
+ break;
+ }
+
+ if
(n->notification.stateChangeNotification.notificationHeader.notificationClassId->vendorId
!= SA_NTF_VENDOR_ID_SAF) {
+ break;
+ }
+
+ if
(n->notification.stateChangeNotification.notificationHeader.notificationClassId->majorId
!= SA_SVC_MSG) {
+ break;
+ }
+
+ if (n->notification.stateChangeNotification.numStateChanges != 1) {
+ break;
+ }
+
+ if (*n->notification.stateChangeNotification.sourceIndicator !=
SA_NTF_OBJECT_OPERATION) {
+ break;
+ }
+
+ if (n->notification.stateChangeNotification.changedStates[0].stateId !=
SA_MSG_DEST_CAPACITY_STATUS) {
+ break;
+ }
+
+ if
(n->notification.stateChangeNotification.notificationHeader.notifyingObject->length
!= sizeof("safApp=safMsgService") - 1) {
+ break;
+ }
+
+ if
(memcmp(n->notification.stateChangeNotification.notificationHeader.notifyingObject->value,
+ "safApp=safMsgService",
+
n->notification.stateChangeNotification.notificationHeader.notifyingObject->length))
{
+ break;
+ }
+
+ if (asyncTest == SA_MSG_QUEUE_CAPACITY_REACHED) {
+ if
(n->notification.stateChangeNotification.notificationHeader.notificationClassId->minorId
!= 0x65) {
+ break;
+ }
+
+ if (n->notification.stateChangeNotification.changedStates[0].newState !=
SA_MSG_QUEUE_CAPACITY_REACHED) {
+ break;
+ }
+
+ if
(n->notification.stateChangeNotification.changedStates[0].oldStatePresent !=
SA_FALSE) {
+ break;
+ }
+
+ asyncTestResult = TET_PASS;
+ } else if (asyncTest == SA_MSG_QUEUE_CAPACITY_AVAILABLE) {
+ if
(n->notification.stateChangeNotification.notificationHeader.notificationClassId->minorId
!= 0x66) {
+ break;
+ }
+
+ if (n->notification.stateChangeNotification.changedStates[0].newState !=
SA_MSG_QUEUE_CAPACITY_AVAILABLE) {
+ break;
+ }
+
+ if
(n->notification.stateChangeNotification.changedStates[0].oldStatePresent !=
SA_TRUE) {
+ break;
+ }
+
+ if (n->notification.stateChangeNotification.changedStates[0].oldState !=
SA_MSG_QUEUE_CAPACITY_REACHED) {
+ break;
+ }
+
+ asyncTestResult = TET_PASS;
+ } else if (asyncTest == SA_MSG_QUEUE_GROUP_CAPACITY_REACHED) {
+ if
(n->notification.stateChangeNotification.notificationHeader.notificationClassId->minorId
!= 0x67) {
+ break;
+ }
+
+ if (n->notification.stateChangeNotification.changedStates[0].newState !=
SA_MSG_QUEUE_GROUP_CAPACITY_REACHED) {
+ break;
+ }
+
+ if
(n->notification.stateChangeNotification.changedStates[0].oldStatePresent !=
SA_FALSE) {
+ break;
+ }
+
+ asyncTestResult = TET_PASS;
+ } else if (asyncTest == SA_MSG_QUEUE_GROUP_CAPACITY_AVAILABLE) {
+ if
(n->notification.stateChangeNotification.notificationHeader.notificationClassId->minorId
!= 0x68) {
+ break;
+ }
+
+ if (n->notification.stateChangeNotification.changedStates[0].newState !=
SA_MSG_QUEUE_GROUP_CAPACITY_AVAILABLE) {
+ break;
+ }
+
+ if
(n->notification.stateChangeNotification.changedStates[0].oldStatePresent !=
SA_TRUE) {
+ break;
+ }
+
+ if (n->notification.stateChangeNotification.changedStates[0].oldState !=
SA_MSG_QUEUE_GROUP_CAPACITY_REACHED) {
+ break;
+ }
+
+ asyncTestResult = TET_PASS;
+ } else {
+ assert(false);
+ break;
+ }
+ } while (false);
+
+ resultQueue.push(asyncTestResult);
+
+ if (testQueue.empty()) {
+ {
+ std::lock_guard<std::mutex> lk(m);
+ ready = true;
+ }
+
+ cv.notify_one();
+ }
+}
+
+static void testNtfStateChange(SaNtfHandleT ntfHandle,
+ SaSelectionObjectT ntfSelObj) {
+ pollfd fd = { static_cast<int>(ntfSelObj), POLLIN };
+
+ do {
+ if (pollDone)
+ break;
+
+ if (poll(&fd, 1, 1000) < 0) {
+ m_TET_MQSV_PRINTF("poll FAILED: %i\n", errno);
+ break;
+ }
+
+ if (fd.revents & POLLIN) {
+ SaAisErrorT rc(saNtfDispatch(ntfHandle, SA_DISPATCH_ONE));
+
+ if (rc != SA_AIS_OK) {
+ if (rc != SA_AIS_ERR_BAD_HANDLE)
+ std::cerr << "saNtfDispatch failed: " << rc << std::endl;
+ break;
+ }
+ }
+ } while (true);
+}
+
+static int testNtfInit(SaNtfHandleT& ntfHandle, SaSelectionObjectT& ntfSelObj)
{
+ int result(TET_PASS);
+
+ do {
+ SaNtfCallbacksT callbacks = { ntfCallback, 0 };
+ SaVersionT version = { 'A', 1, 0 };
+ SaNtfStateChangeNotificationFilterT stateChangeFilter;
+
+ SaAisErrorT rc(saNtfInitialize(&ntfHandle, &callbacks, &version));
+ if (rc != SA_AIS_OK) {
+ result = TET_FAIL;
+ break;
+ }
+
+ rc = saNtfStateChangeNotificationFilterAllocate(
+ ntfHandle, &stateChangeFilter, 0, 0, 0, 0, 0, 0);
+
+ if (rc != SA_AIS_OK) {
+ result = TET_FAIL;
+ break;
+ }
+
+ SaNtfNotificationTypeFilterHandlesT notificationFilterHandles = {
+ 0, 0, stateChangeFilter.notificationFilterHandle, 0, 0
+ };
+
+ rc = saNtfNotificationSubscribe(¬ificationFilterHandles, 1);
+
+ if (rc != SA_AIS_OK) {
+ result = TET_FAIL;
+ break;
+ }
+
+ rc =
saNtfNotificationFilterFree(stateChangeFilter.notificationFilterHandle);
+ if (rc != SA_AIS_OK) {
+ result = TET_FAIL;
+ break;
+ }
+
+ rc = saNtfSelectionObjectGet(ntfHandle, &ntfSelObj);
+ if (rc != SA_AIS_OK) {
+ result = TET_FAIL;
+ break;
+ }
+
+ pollDone = false;
+ } while (false);
+
+ return result;
+}
+
+static int testNtfCleanup(SaNtfHandleT ntfHandle) {
+ int result(TET_PASS);
+ SaAisErrorT rc(saNtfFinalize(ntfHandle));
+ if (rc != SA_AIS_OK)
+ result = TET_FAIL;
+
+ pollDone = true;
+
+ return result;
+}
+
+static void capacityThresholds_01(void) {
+ SaMsgQueueThresholdsT thresholds;
+ SaAisErrorT rc(saMsgQueueCapacityThresholdsSet(0xdeadbeef, &thresholds));
+ aisrc_validate(rc, SA_AIS_ERR_BAD_HANDLE);
+}
+
+static void capacityThresholds_02(void) {
+ SaMsgHandleT msgHandle;
+ SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+ rc = saMsgFinalize(msgHandle);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueThresholdsT thresholds;
+ rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+ aisrc_validate(rc, SA_AIS_ERR_BAD_HANDLE);
+}
+
+static void capacityThresholds_03(void) {
+ SaMsgHandleT msgHandle;
+ SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+ rc = saMsgQueueClose(queueHandle);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueThresholdsT thresholds;
+ rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+ aisrc_validate(rc, SA_AIS_ERR_BAD_HANDLE);
+
+ rc = saMsgFinalize(msgHandle);
+ assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_04(void) {
+ SaMsgHandleT msgHandle;
+ SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+ rc = saMsgQueueCapacityThresholdsSet(queueHandle, 0);
+ aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);
+
+ rc = saMsgFinalize(msgHandle);
+ assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_05(void) {
+ SaMsgHandleT msgHandle;
+ SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+ const SaMsgQueueThresholdsT thresholds = {
+ { 4, 5, 5, 5 }, { 5, 5, 5, 5 }
+ };
+
+ rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+ aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);
+
+ rc = saMsgFinalize(msgHandle);
+ assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_06(void) {
+ SaMsgHandleT msgHandle;
+ SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+ const SaMsgQueueThresholdsT thresholds = {
+ { 5, 4, 5, 5 }, { 5, 5, 5, 5 }
+ };
+
+ rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+ aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);
+
+ rc = saMsgFinalize(msgHandle);
+ assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_07(void) {
+ SaMsgHandleT msgHandle;
+ SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+ const SaMsgQueueThresholdsT thresholds = {
+ { 5, 5, 4, 5 }, { 5, 5, 5, 5 }
+ };
+
+ rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+ aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);
+
+ rc = saMsgFinalize(msgHandle);
+ assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_08(void) {
+ SaMsgHandleT msgHandle;
+ SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+ const SaMsgQueueThresholdsT thresholds = {
+ { 5, 5, 5, 4 }, { 5, 5, 5, 5 }
+ };
+
+ rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+ aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);
+
+ rc = saMsgFinalize(msgHandle);
+ assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_09(void) {
+ SaMsgHandleT msgHandle;
+ SaVersionT msg1_1 = {'B', 1, 0};
+ SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg1_1);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+ const SaMsgQueueThresholdsT thresholds = {
+ { 5, 5, 5, 5 }, { 5, 5, 5, 5 }
+ };
+
+ rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+ aisrc_validate(rc, SA_AIS_ERR_VERSION);
+
+ rc = saMsgFinalize(msgHandle);
+ assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_10(void) {
+ SaMsgHandleT msgHandle;
+ SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+ const SaMsgQueueThresholdsT thresholds = {
+ { 6, 5, 5, 5 }, { 5, 5, 5, 5 }
+ };
+
+ rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+ aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);
+
+ rc = saMsgFinalize(msgHandle);
+ assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_11(void) {
+ SaMsgHandleT msgHandle;
+ SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+ const SaMsgQueueThresholdsT thresholds = {
+ { 5, 6, 5, 5 }, { 5, 5, 5, 5 }
+ };
+
+ rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+ aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);
+
+ rc = saMsgFinalize(msgHandle);
+ assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_12(void) {
+ SaMsgHandleT msgHandle;
+ SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+ const SaMsgQueueThresholdsT thresholds = {
+ { 5, 5, 6, 5 }, { 5, 5, 5, 5 }
+ };
+
+ rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+ aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);
+
+ rc = saMsgFinalize(msgHandle);
+ assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_13(void) {
+ SaMsgHandleT msgHandle;
+ SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+ const SaMsgQueueThresholdsT thresholds = {
+ { 5, 5, 5, 6 }, { 5, 5, 5, 5 }
+ };
+
+ rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+ aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);
+
+ rc = saMsgFinalize(msgHandle);
+ assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_14(void) {
+ SaMsgHandleT msgHandle;
+ SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+ const SaMsgQueueThresholdsT thresholds = {
+ { 5, 5, 5, 5 }, { 5, 5, 5, 5 }
+ };
+
+ rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+ aisrc_validate(rc, SA_AIS_OK);
+
+ rc = saMsgFinalize(msgHandle);
+ assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_15(void) {
+ SaMsgQueueThresholdsT thresholds;
+ SaAisErrorT rc(saMsgQueueCapacityThresholdsGet(0xdeadbeef, &thresholds));
+ aisrc_validate(rc, SA_AIS_ERR_BAD_HANDLE);
+}
+
+static void capacityThresholds_16(void) {
+ SaMsgHandleT msgHandle;
+ SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+ rc = saMsgFinalize(msgHandle);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueThresholdsT thresholds;
+ rc = saMsgQueueCapacityThresholdsGet(queueHandle, &thresholds);
+ aisrc_validate(rc, SA_AIS_ERR_BAD_HANDLE);
+}
+
+static void capacityThresholds_17(void) {
+ SaMsgHandleT msgHandle;
+ SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+ rc = saMsgQueueClose(queueHandle);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueThresholdsT thresholds;
+ rc = saMsgQueueCapacityThresholdsGet(queueHandle, &thresholds);
+ aisrc_validate(rc, SA_AIS_ERR_BAD_HANDLE);
+
+ rc = saMsgFinalize(msgHandle);
+ assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_18(void) {
+ SaMsgHandleT msgHandle;
+ SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+ rc = saMsgQueueCapacityThresholdsGet(queueHandle, 0);
+ aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);
+
+ rc = saMsgFinalize(msgHandle);
+ assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_19(void) {
+ SaMsgHandleT msgHandle;
+ SaVersionT msg1_1 = {'B', 1, 0};
+ SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg1_1);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+ SaMsgQueueThresholdsT thresholds;
+ rc = saMsgQueueCapacityThresholdsGet(queueHandle, &thresholds);
+ aisrc_validate(rc, SA_AIS_ERR_VERSION);
+
+ rc = saMsgFinalize(msgHandle);
+ assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_20(void) {
+ SaMsgHandleT msgHandle;
+ SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+ const SaMsgQueueThresholdsT thresholds = {
+ { 1, 2, 3, 4 }, { 1, 2, 3, 4 }
+ };
+
+ rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueThresholdsT thresholdsGet;
+ rc = saMsgQueueCapacityThresholdsGet(queueHandle, &thresholdsGet);
+ aisrc_validate(rc, SA_AIS_OK);
+
+ assert(!memcmp(&thresholds, &thresholdsGet, sizeof(SaMsgQueueThresholdsT)));
+
+ rc = saMsgFinalize(msgHandle);
+ assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_21(void) {
+ int result(TET_PASS);
+
+ do {
+ SaMsgHandleT msgHandle;
+ SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+ const SaMsgQueueThresholdsT thresholds = {
+ { 3, 3, 3, 3 }, { 2, 2, 2, 2 }
+ };
+
+ rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+ assert(rc == SA_AIS_OK);
+
+ SaNtfHandleT ntfHandle;
+ SaSelectionObjectT ntfSelObj;
+ result = testNtfInit(ntfHandle, ntfSelObj);
+ if (result != TET_PASS)
+ break;
+
+ ready = false;
+ std::thread t(&testNtfStateChange, ntfHandle, ntfSelObj);
+ testQueue.push(SA_MSG_QUEUE_CAPACITY_REACHED);
+
+ /* fill up the q */
+ for (SaUint8T i(SA_MSG_MESSAGE_HIGHEST_PRIORITY);
+ i <= SA_MSG_MESSAGE_LOWEST_PRIORITY;
+ i++) {
+ char data[] = "abcd";
+
+ SaMsgMessageT message = {
+ 1, 1, sizeof(data) - 1, 0, data, i
+ };
+
+ rc = saMsgMessageSend(msgHandle, &queueName, &message,
SA_TIME_ONE_SECOND);
+ assert(rc == SA_AIS_OK);
+ }
+
+ // wait for the notifications
+ {
+ std::unique_lock<std::mutex> lk(m);
+ cv.wait(lk, []{ return ready; });
+ }
+
+ result = resultQueue.front();
+ resultQueue.pop();
+ if (result != TET_PASS)
+ break;
+
+ result = testNtfCleanup(ntfHandle);
+ if (result != TET_PASS)
+ break;
+
+ rc = saMsgFinalize(msgHandle);
+ assert(rc == SA_AIS_OK);
+
+ t.join();
+ } while (false);
+
+ test_validate(result, TET_PASS);
+
+ // let the queue close
+ sleep(2);
+}
+
+static void capacityThresholds_22(void) {
+ int result(TET_PASS);
+
+ do {
+ SaMsgHandleT msgHandle;
+ SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+ const SaMsgQueueThresholdsT thresholds = {
+ { 3, 3, 3, 3 }, { 2, 2, 2, 2 }
+ };
+
+ rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+ assert(rc == SA_AIS_OK);
+
+ SaNtfHandleT ntfHandle;
+ SaSelectionObjectT ntfSelObj;
+ result = testNtfInit(ntfHandle, ntfSelObj);
+ if (result != TET_PASS)
+ break;
+
+ ready = false;
+ std::thread t(&testNtfStateChange, ntfHandle, ntfSelObj);
+ testQueue.push(SA_MSG_QUEUE_CAPACITY_REACHED);
+
+ /* fill up the q */
+ for (SaUint8T i(SA_MSG_MESSAGE_HIGHEST_PRIORITY);
+ i <= SA_MSG_MESSAGE_LOWEST_PRIORITY;
+ i++) {
+ char data[] = "abcd";
+
+ SaMsgMessageT message = {
+ 1, 1, sizeof(data) - 1, 0, data, i
+ };
+
+ rc = saMsgMessageSend(msgHandle, &queueName, &message,
SA_TIME_ONE_SECOND);
+ assert(rc == SA_AIS_OK);
+ }
+
+ // wait for the notifications
+ {
+ std::unique_lock<std::mutex> lk(m);
+ cv.wait(lk, []{ return ready; });
+ }
+
+ result = resultQueue.front();
+ resultQueue.pop();
+ if (result != TET_PASS)
+ break;
+
+ ready = false;
+
+ testQueue.push(SA_MSG_QUEUE_CAPACITY_AVAILABLE);
+
+ /* read a message */
+ char data[4];
+ SaMsgMessageT msg;
+ SaMsgSenderIdT senderId;
+ memset(&msg, 0, sizeof(msg));
+ msg.data = &data;
+ msg.size = sizeof(data);
+
+ rc = saMsgMessageGet(queueHandle, &msg, 0, &senderId, SA_TIME_ONE_SECOND);
+ assert(rc == SA_AIS_OK);
+
+ // wait for the notifications
+ {
+ std::unique_lock<std::mutex> lk(m);
+ cv.wait(lk, []{ return ready; });
+ }
+
+ result = resultQueue.front();
+ resultQueue.pop();
+ if (result != TET_PASS)
+ break;
+
+ result = testNtfCleanup(ntfHandle);
+ if (result != TET_PASS)
+ break;
+
+ rc = saMsgFinalize(msgHandle);
+ assert(rc == SA_AIS_OK);
+
+ t.join();
+ } while (false);
+
+ test_validate(result, TET_PASS);
+
+ // let the queue close
+ sleep(2);
+}
+
+static void capacityThresholds_23(void) {
+ int result(TET_PASS);
+
+ do {
+ SaMsgHandleT msgHandle;
+ SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+ const SaMsgQueueThresholdsT thresholds = {
+ { 3, 3, 3, 3 }, { 2, 2, 2, 2 }
+ };
+
+ rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+ assert(rc == SA_AIS_OK);
+
+ rc = saMsgQueueGroupCreate(msgHandle,
+ &queueGroupName,
+ SA_MSG_QUEUE_GROUP_ROUND_ROBIN);
+ assert(rc == SA_AIS_OK);
+
+ rc = saMsgQueueGroupInsert(msgHandle, &queueGroupName, &queueName);
+ assert(rc == SA_AIS_OK);
+
+ SaNtfHandleT ntfHandle;
+ SaSelectionObjectT ntfSelObj;
+ result = testNtfInit(ntfHandle, ntfSelObj);
+ if (result != TET_PASS)
+ break;
+
+ ready = false;
+ std::thread t(&testNtfStateChange, ntfHandle, ntfSelObj);
+ testQueue.push(SA_MSG_QUEUE_CAPACITY_REACHED);
+ testQueue.push(SA_MSG_QUEUE_GROUP_CAPACITY_REACHED);
+
+ /* fill up the q */
+ for (SaUint8T i(SA_MSG_MESSAGE_HIGHEST_PRIORITY);
+ i <= SA_MSG_MESSAGE_LOWEST_PRIORITY;
+ i++) {
+ char data[] = "abcd";
+
+ SaMsgMessageT message = {
+ 1, 1, sizeof(data) - 1, 0, data, i
+ };
+
+ rc = saMsgMessageSend(msgHandle,
+ &queueGroupName,
+ &message,
+ SA_TIME_ONE_SECOND);
+ assert(rc == SA_AIS_OK);
+ }
+
+ // wait for the notifications
+ {
+ std::unique_lock<std::mutex> lk(m);
+ cv.wait(lk, []{ return ready; });
+ }
+
+ ready = false;
+
+ result = resultQueue.front();
+ resultQueue.pop();
+ if (result != TET_PASS)
+ break;
+
+ result = resultQueue.front();
+ resultQueue.pop();
+ if (result != TET_PASS)
+ break;
+
+ result = testNtfCleanup(ntfHandle);
+ if (result != TET_PASS) {
+ break;
+ }
+
+ rc = saMsgQueueGroupDelete(msgHandle, &queueGroupName);
+ assert(rc == SA_AIS_OK);
+
+ rc = saMsgFinalize(msgHandle);
+ assert(rc == SA_AIS_OK);
+
+ t.join();
+ } while (false);
+
+ test_validate(result, TET_PASS);
+
+ // let the queue close
+ sleep(2);
+}
+
+static void capacityThresholds_24(void) {
+ int result(TET_PASS);
+
+ do {
+ SaMsgHandleT msgHandle;
+ SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+ const SaMsgQueueThresholdsT thresholds = {
+ { 3, 3, 3, 3 }, { 2, 2, 2, 2 }
+ };
+
+ rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+ assert(rc == SA_AIS_OK);
+
+ rc = saMsgQueueGroupCreate(msgHandle,
+ &queueGroupName,
+ SA_MSG_QUEUE_GROUP_ROUND_ROBIN);
+ assert(rc == SA_AIS_OK);
+
+ rc = saMsgQueueGroupInsert(msgHandle, &queueGroupName, &queueName);
+ assert(rc == SA_AIS_OK);
+
+ SaNtfHandleT ntfHandle;
+ SaSelectionObjectT ntfSelObj;
+ result = testNtfInit(ntfHandle, ntfSelObj);
+ if (result != TET_PASS) {
+ break;
+ }
+
+ ready = false;
+ std::thread t(&testNtfStateChange, ntfHandle, ntfSelObj);
+ testQueue.push(SA_MSG_QUEUE_CAPACITY_REACHED);
+ testQueue.push(SA_MSG_QUEUE_GROUP_CAPACITY_REACHED);
+
+ /* fill up the q */
+ for (SaUint8T i(SA_MSG_MESSAGE_HIGHEST_PRIORITY);
+ i <= SA_MSG_MESSAGE_LOWEST_PRIORITY;
+ i++) {
+ char data[] = "abcd";
+
+ SaMsgMessageT message = {
+ 1, 1, sizeof(data) - 1, 0, data, i
+ };
+
+ rc = saMsgMessageSend(msgHandle, &queueName, &message,
SA_TIME_ONE_SECOND);
+ assert(rc == SA_AIS_OK);
+ }
+
+ // wait for the notifications
+ {
+ std::unique_lock<std::mutex> lk(m);
+ cv.wait(lk, []{ return ready; });
+ }
+
+ result = resultQueue.front();
+ resultQueue.pop();
+ if (result != TET_PASS)
+ break;
+
+ result = resultQueue.front();
+ resultQueue.pop();
+ if (result != TET_PASS)
+ break;
+
+ ready = false;
+
+ testQueue.push(SA_MSG_QUEUE_CAPACITY_AVAILABLE);
+ testQueue.push(SA_MSG_QUEUE_GROUP_CAPACITY_AVAILABLE);
+
+ /* read a message */
+ char data[4];
+ SaMsgMessageT msg;
+ SaMsgSenderIdT senderId;
+ memset(&msg, 0, sizeof(msg));
+ msg.data = &data;
+ msg.size = sizeof(data);
+
+ rc = saMsgMessageGet(queueHandle, &msg, 0, &senderId, SA_TIME_ONE_SECOND);
+ assert(rc == SA_AIS_OK);
+
+ {
+ std::unique_lock<std::mutex> lk(m);
+ cv.wait(lk, []{ return ready; });
+ }
+
+ result = resultQueue.front();
+ resultQueue.pop();
+ if (result != TET_PASS)
+ break;
+
+ result = resultQueue.front();
+ resultQueue.pop();
+ if (result != TET_PASS)
+ break;
+
+ result = testNtfCleanup(ntfHandle);
+ if (result != TET_PASS) {
+ break;
+ }
+
+ rc = saMsgQueueGroupDelete(msgHandle, &queueGroupName);
+ assert(rc == SA_AIS_OK);
+
+ rc = saMsgFinalize(msgHandle);
+ assert(rc == SA_AIS_OK);
+
+ t.join();
+ } while (false);
+
+ test_validate(result, TET_PASS);
+
+ // let the queue close
+ sleep(2);
+}
+
+__attribute__((constructor)) static void capacityThresholds_constructor(void) {
+ test_suite_add(27, "Capacity Thresholds Test Suite");
+ test_case_add(27,
+ capacityThresholds_01,
+ "saMsgQueueCapacityThresholdsSet returns BAD_HANDLE when "
+ "queueHandle is bad");
+ test_case_add(27,
+ capacityThresholds_02,
+ "saMsgQueueCapacityThresholdsSet with finalized handle");
+ test_case_add(27,
+ capacityThresholds_03,
+ "saMsgQueueCapacityThresholdsSet with closed queue");
+ test_case_add(27,
+ capacityThresholds_04,
+ "saMsgQueueCapacityThresholdsSet with null thresholds
pointer");
+ test_case_add(27,
+ capacityThresholds_05,
+ "saMsgQueueCapacityThresholdsSet with capacityAvailable > "
+ "capacityReached (1)");
+ test_case_add(27,
+ capacityThresholds_06,
+ "saMsgQueueCapacityThresholdsSet with capacityAvailable > "
+ "capacityReached (2)");
+ test_case_add(27,
+ capacityThresholds_07,
+ "saMsgQueueCapacityThresholdsSet with capacityAvailable > "
+ "capacityReached (3)");
+ test_case_add(27,
+ capacityThresholds_08,
+ "saMsgQueueCapacityThresholdsSet with capacityAvailable > "
+ "capacityReached (4)");
+ test_case_add(27,
+ capacityThresholds_09,
+ "saMsgQueueCapacityThresholdsSet with version != B.03.01");
+ test_case_add(27,
+ capacityThresholds_10,
+ "saMsgQueueCapacityThresholdsSet with size < capacityReached "
+ "(1)");
+ test_case_add(27,
+ capacityThresholds_11,
+ "saMsgQueueCapacityThresholdsSet with size < capacityReached "
+ "(2)");
+ test_case_add(27,
+ capacityThresholds_12,
+ "saMsgQueueCapacityThresholdsSet with size < capacityReached "
+ "(3)");
+ test_case_add(27,
+ capacityThresholds_13,
+ "saMsgQueueCapacityThresholdsSet with size < capacityReached "
+ "(4)");
+ test_case_add(27,
+ capacityThresholds_14,
+ "saMsgQueueCapacityThresholdsSet success");
+ test_case_add(27,
+ capacityThresholds_15,
+ "saMsgQueueCapacityThresholdsGet returns BAD_HANDLE when "
+ "queueHandle is bad");
+ test_case_add(27,
+ capacityThresholds_16,
+ "saMsgQueueCapacityThresholdsGet with finalized handle");
+ test_case_add(27,
+ capacityThresholds_17,
+ "saMsgQueueCapacityThresholdsGet with closed queue");
+ test_case_add(27,
+ capacityThresholds_18,
+ "saMsgQueueCapacityThresholdsGet with null thresholds
pointer");
+ test_case_add(27,
+ capacityThresholds_19,
+ "saMsgQueueCapacityThresholdsGet with version != B.03.01");
+ test_case_add(27,
+ capacityThresholds_20,
+ "saMsgQueueCapacityThresholdsGet success");
+ test_case_add(27,
+ capacityThresholds_21,
+ "Capacity Reached Notification sent");
+ test_case_add(27,
+ capacityThresholds_22,
+ "Capacity Available Notification sent");
+ test_case_add(27,
+ capacityThresholds_23,
+ "Capacity Reached Notification sent (group)");
+ test_case_add(27,
+ capacityThresholds_24,
+ "Capacity Available Notification sent (group)");
+}
diff --git a/src/msg/apitest/test_ErrUnavailable.cc
b/src/msg/apitest/test_ErrUnavailable.cc
index 3f37f8d..dc1b371 100644
--- a/src/msg/apitest/test_ErrUnavailable.cc
+++ b/src/msg/apitest/test_ErrUnavailable.cc
@@ -504,6 +504,59 @@ static void saErrUnavailable_25(void)
assert(rc == SA_AIS_OK);
}
+static void saErrUnavailable_26(void)
+{
+ SaMsgHandleT msgHandle;
+ SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueHandleT queueHandle;
+ rc = saMsgQueueOpen(msgHandle,
+ &queueName,
+ &creationAttributes,
+ SA_MSG_QUEUE_CREATE,
+ SA_TIME_END,
+ &queueHandle);
+ assert(rc == SA_AIS_OK);
+
+ lockUnlockNode(true);
+ const SaMsgQueueThresholdsT thresholds = {
+ { 5, 5, 5, 5 },
+ { 5, 5, 5, 5 }
+ };
+ rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+ test_validate(rc, SA_AIS_ERR_UNAVAILABLE);
+ lockUnlockNode(false);
+
+ rc = saMsgFinalize(msgHandle);
+ assert(rc == SA_AIS_OK);
+}
+
+static void saErrUnavailable_27(void)
+{
+ SaMsgHandleT msgHandle;
+ SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueHandleT queueHandle;
+ rc = saMsgQueueOpen(msgHandle,
+ &queueName,
+ &creationAttributes,
+ SA_MSG_QUEUE_CREATE,
+ SA_TIME_END,
+ &queueHandle);
+ assert(rc == SA_AIS_OK);
+
+ lockUnlockNode(true);
+ SaMsgQueueThresholdsT thresholds;
+ rc = saMsgQueueCapacityThresholdsGet(queueHandle, &thresholds);
+ test_validate(rc, SA_AIS_ERR_UNAVAILABLE);
+ lockUnlockNode(false);
+
+ rc = saMsgFinalize(msgHandle);
+ assert(rc == SA_AIS_OK);
+}
+
static void saErrUnavailable_30(void)
{
lockUnlockNode(true);
@@ -940,6 +993,59 @@ static void saErrUnavailable_54(void)
assert(rc == SA_AIS_OK);
}
+static void saErrUnavailable_55(void)
+{
+ SaMsgHandleT msgHandle;
+ SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueHandleT queueHandle;
+ rc = saMsgQueueOpen(msgHandle,
+ &queueName,
+ &creationAttributes,
+ SA_MSG_QUEUE_CREATE,
+ SA_TIME_END,
+ &queueHandle);
+ assert(rc == SA_AIS_OK);
+
+ lockUnlockNode(true);
+ lockUnlockNode(false);
+ const SaMsgQueueThresholdsT thresholds = {
+ { 5, 5, 5, 5 },
+ { 5, 5, 5, 5 }
+ };
+ rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+ test_validate(rc, SA_AIS_ERR_UNAVAILABLE);
+
+ rc = saMsgFinalize(msgHandle);
+ assert(rc == SA_AIS_OK);
+}
+
+static void saErrUnavailable_56(void)
+{
+ SaMsgHandleT msgHandle;
+ SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+ assert(rc == SA_AIS_OK);
+
+ SaMsgQueueHandleT queueHandle;
+ rc = saMsgQueueOpen(msgHandle,
+ &queueName,
+ &creationAttributes,
+ SA_MSG_QUEUE_CREATE,
+ SA_TIME_END,
+ &queueHandle);
+ assert(rc == SA_AIS_OK);
+
+ lockUnlockNode(true);
+ lockUnlockNode(false);
+ SaMsgQueueThresholdsT thresholds;
+ rc = saMsgQueueCapacityThresholdsGet(queueHandle, &thresholds);
+ test_validate(rc, SA_AIS_ERR_UNAVAILABLE);
+
+ rc = saMsgFinalize(msgHandle);
+ assert(rc == SA_AIS_OK);
+}
+
__attribute__((constructor)) static void saErrUnavailable_constructor(void)
{
@@ -969,9 +1075,9 @@ __attribute__((constructor)) static void
saErrUnavailable_constructor(void)
test_case_add(26, saErrUnavailable_23, "saMsgMessageSendReceive");
test_case_add(26, saErrUnavailable_24, "saMsgMessageReply");
test_case_add(26, saErrUnavailable_25, "saMsgMessageReplyAsync");
-#if 0
test_case_add(26, saErrUnavailable_26, "saMsgQueueCapacityThresholdsSet");
test_case_add(26, saErrUnavailable_27, "saMsgQueueCapacityThresholdsGet");
+#if 0
test_case_add(26, saErrUnavailable_28, "saMsgMetadataSizeGet");
test_case_add(26, saErrUnavailable_29, "saMsgLimitGet");
#endif
@@ -1000,9 +1106,9 @@ __attribute__((constructor)) static void
saErrUnavailable_constructor(void)
test_case_add(26, saErrUnavailable_52, "saMsgMessageSendReceive (stale)");
test_case_add(26, saErrUnavailable_53, "saMsgMessageReply (stale)");
test_case_add(26, saErrUnavailable_54, "saMsgMessageReplyAsync (stale)");
-#if 0
test_case_add(26, saErrUnavailable_55, "saMsgQueueCapacityThresholdsSet
(stale)");
test_case_add(26, saErrUnavailable_56, "saMsgQueueCapacityThresholdsGet
(stale)");
+#if 0
test_case_add(26, saErrUnavailable_57, "saMsgMetadataSizeGet (stale)");
test_case_add(26, saErrUnavailable_58, "saMsgLimitGet (stale)");
#endif
diff --git a/src/msg/common/mqsv_edu.c b/src/msg/common/mqsv_edu.c
index 7b0749b..a5301da 100644
--- a/src/msg/common/mqsv_edu.c
+++ b/src/msg/common/mqsv_edu.c
@@ -477,7 +477,8 @@ static int mqsv_mqp_req_test_type_fnc(NCSCONTEXT arg)
LCL_TEST_JUMP_OFFSET_MQP_EVT_TRANSFER_QUEUE_COMPLETE = 60,
LCL_TEST_JUMP_OFFSET_MQP_EVT_UPDATE_STATS = 62,
LCL_TEST_JUMP_OFFSET_MQP_EVT_RET_TIME_SET_REQ = 65,
- LCL_TEST_JUMP_OFFSET_MQP_EVT_CLM_NOTIFY = 67 };
+ LCL_TEST_JUMP_OFFSET_MQP_EVT_CLM_NOTIFY = 67,
+ LCL_TEST_JUMP_OFFSET_MQP_EVT_CAPACITY = 68 };
MQP_REQ_TYPE type;
if (arg == NULL)
@@ -516,6 +517,9 @@ static int mqsv_mqp_req_test_type_fnc(NCSCONTEXT arg)
return LCL_TEST_JUMP_OFFSET_MQP_EVT_RET_TIME_SET_REQ;
case MQP_EVT_CLM_NOTIFY:
return LCL_TEST_JUMP_OFFSET_MQP_EVT_CLM_NOTIFY;
+ case MQP_EVT_CAP_SET_REQ:
+ case MQP_EVT_CAP_GET_REQ:
+ return LCL_TEST_JUMP_OFFSET_MQP_EVT_CAPACITY;
default:
break;
@@ -578,6 +582,55 @@ static uint32_t mqsv_edp_samsgqueuecreationattributest(
/*****************************************************************************
+ PROCEDURE NAME: mqsv_edp_samsgqueuethresholdst
+
+ DESCRIPTION: EDU program handler for "SaMsgQueueThresholdsT" data. This
+function is invoked by EDU for performing encode/decode operation on
+"SaMsgQueueThresholdsT" data.
+
+ RETURNS: NCSCC_RC_SUCCESS/NCSCC_RC_FAILURE
+
+*****************************************************************************/
+static uint32_t mqsv_edp_samsgqueuethresholdst(
+ EDU_HDL *hdl, EDU_TKN *edu_tkn, NCSCONTEXT ptr, uint32_t *ptr_data_len,
+ EDU_BUF_ENV *buf_env, EDP_OP_TYPE op, EDU_ERR *o_err)
+{
+ uint32_t rc = NCSCC_RC_SUCCESS;
+ SaMsgQueueThresholdsT *struct_ptr = NULL, **d_ptr = NULL;
+ EDU_INST_SET mqsv_samsgqueuethresholds_rules[] = {
+ {EDU_START, mqsv_edp_samsgqueuethresholdst, 0, 0, 0,
+ sizeof(SaMsgQueueThresholdsT), 0, NULL},
+ {EDU_EXEC, m_NCS_EDP_SASIZET, EDQ_ARRAY, 0, 0,
+ (long)&((SaMsgQueueThresholdsT *)0)->capacityReached,
+ SA_MSG_MESSAGE_LOWEST_PRIORITY + 1, NULL},
+ {EDU_EXEC, m_NCS_EDP_SASIZET, EDQ_ARRAY, 0, 0,
+ (long)&((SaMsgQueueThresholdsT *)0)->capacityAvailable,
+ SA_MSG_MESSAGE_LOWEST_PRIORITY + 1, NULL},
+ {EDU_END, 0, 0, 0, 0, 0, 0, NULL},
+ };
+
+ if (op == EDP_OP_TYPE_ENC) {
+ struct_ptr = (SaMsgQueueThresholdsT *)ptr;
+ } else if (op == EDP_OP_TYPE_DEC) {
+ d_ptr = (SaMsgQueueThresholdsT **)ptr;
+ if (*d_ptr == NULL) {
+ /* This should have already been a valid pointer. */
+ *o_err = EDU_ERR_MEM_FAIL;
+ return NCSCC_RC_FAILURE;
+ }
+ memset(*d_ptr, '\0', sizeof(SaMsgQueueThresholdsT));
+ struct_ptr = *d_ptr;
+ } else {
+ struct_ptr = ptr;
+ }
+ rc = m_NCS_EDU_RUN_RULES(hdl, edu_tkn,
+ mqsv_samsgqueuethresholds_rules,
+ struct_ptr, ptr_data_len, buf_env, op, o_err);
+ return rc;
+}
+
+/*****************************************************************************
+
PROCEDURE NAME: mqsv_edp_mqp_req
DESCRIPTION: EDU program handler for "MQP_REQ_MSG" data. This
@@ -802,6 +855,13 @@ static uint32_t mqsv_edp_mqp_req(EDU_HDL *hdl, EDU_TKN
*edu_tkn, NCSCONTEXT ptr,
{EDU_EXEC, ncs_edp_uns32, 0, 0, 0,
(long)&((MQP_REQ_MSG *)0)->info.clmNotify.node_joined, 0, NULL},
+ /* MQP_EVT_CAP_REQ */
+ {EDU_EXEC, m_NCS_EDP_SAMSGQUEUEHANDLET, 0, 0, 0,
+ (long)&((MQP_REQ_MSG *)0)->info.capacity.queueHandle, 0, NULL},
+ {EDU_EXEC, mqsv_edp_samsgqueuethresholdst, 0, 0, 0,
+ (long)&((MQP_REQ_MSG *)0)->info.capacity.thresholds,
+ 0, NULL},
+
{EDU_END, 0, 0, 0, 0, 0, 0, NULL},
};
@@ -956,7 +1016,7 @@ static int mqsv_mqp_rsp_test_type_fnc(NCSCONTEXT arg)
LCL_TEST_JUMP_OFFSET_MQP_EVT_TRANSFER_QUEUE_RSP,
LCL_TEST_JUMP_OFFSET_MQP_EVT_ND_RESTART_RSP = 29,
LCL_TEST_JUMP_OFFSET_MQP_EVT_RET_TIME_SET_RSP,
-
+ LCL_TEST_JUMP_OFFSET_MQP_EVT_CAP_RSP = 32
};
MQP_RSP_TYPE type;
@@ -988,6 +1048,9 @@ static int mqsv_mqp_rsp_test_type_fnc(NCSCONTEXT arg)
return LCL_TEST_JUMP_OFFSET_MQP_EVT_ND_RESTART_RSP;
case MQP_EVT_Q_RET_TIME_SET_RSP:
return LCL_TEST_JUMP_OFFSET_MQP_EVT_RET_TIME_SET_RSP;
+ case MQP_EVT_CAP_SET_RSP:
+ case MQP_EVT_CAP_GET_RSP:
+ return LCL_TEST_JUMP_OFFSET_MQP_EVT_CAP_RSP;
default:
break;
}
@@ -1109,6 +1172,13 @@ static uint32_t mqsv_edp_mqp_rsp(EDU_HDL *hdl, EDU_TKN
*edu_tkn, NCSCONTEXT ptr,
(long)&((MQP_RSP_MSG *)0)->info.retTimeSetRsp.queueHandle, 0,
NULL},
+ /* MQP_EVT_CAP_RSP */
+ {EDU_EXEC, m_NCS_EDP_SAMSGQUEUEHANDLET, 0, 0, 0,
+ (long)&((MQP_RSP_MSG *)0)->info.capacity.queueHandle, 0, NULL},
+ {EDU_EXEC, mqsv_edp_samsgqueuethresholdst, 0, 0, 0,
+ (long)&((MQP_RSP_MSG *)0)->info.capacity.thresholds,
+ 0, NULL},
+
{EDU_END, 0, 0, 0, 0, 0, 0, NULL},
};
diff --git a/src/msg/common/mqsv_evt.h b/src/msg/common/mqsv_evt.h
index ef7c739..4e26bf6 100644
--- a/src/msg/common/mqsv_evt.h
+++ b/src/msg/common/mqsv_evt.h
@@ -62,7 +62,9 @@ typedef enum mqp_req_type {
MQP_EVT_SEND_MSG_ASYNC,
MQP_EVT_CB_DUMP, /* For CPND CB Dump */
MQP_EVT_Q_RET_TIME_SET_REQ,
- MQP_EVT_CLM_NOTIFY
+ MQP_EVT_CLM_NOTIFY,
+ MQP_EVT_CAP_SET_REQ,
+ MQP_EVT_CAP_GET_REQ
} MQP_REQ_TYPE;
/* Enums for MQP Message Types */
@@ -81,7 +83,9 @@ typedef enum mqp_rsp_type {
MQP_EVT_TRANSFER_QUEUE_RSP,
MQP_EVT_MQND_RESTART_RSP,
MQP_EVT_STAT_UPD_RSP,
- MQP_EVT_Q_RET_TIME_SET_RSP
+ MQP_EVT_Q_RET_TIME_SET_RSP,
+ MQP_EVT_CAP_SET_RSP,
+ MQP_EVT_CAP_GET_RSP
} MQP_RSP_TYPE;
/* Enums for MQD messages */
@@ -188,6 +192,11 @@ typedef struct mqp_q_ret_time_set_rsp {
SaMsgQueueHandleT queueHandle;
} MQP_Q_RET_TIME_SET_RSP;
+typedef struct mqp_capacity {
+ SaMsgQueueHandleT queueHandle;
+ SaMsgQueueThresholdsT thresholds;
+} MQP_CAPACITY;
+
/* saMsgQueueOpen(): */
typedef struct mqp_open_req {
@@ -396,6 +405,7 @@ typedef struct mqp_req_msg {
MQP_UPDATE_STATS statsReq;
MQP_Q_RET_TIME_SET_REQ retTimeSetReq;
MQP_CLM_NOTIFY clmNotify;
+ MQP_CAPACITY capacity;
} info;
} MQP_REQ_MSG;
@@ -415,6 +425,7 @@ typedef struct mqp_rsp_msg {
MQP_QUEUE_REPLY_RSP replyRsp;
MQP_SEND_MSG_RSP sendMsgRsp;
MQP_Q_RET_TIME_SET_RSP retTimeSetRsp;
+ MQP_CAPACITY capacity;
} info;
} MQP_RSP_MSG;
diff --git a/src/msg/msgd/mqd_api.c b/src/msg/msgd/mqd_api.c
index 8416851..83d5c21 100644
--- a/src/msg/msgd/mqd_api.c
+++ b/src/msg/msgd/mqd_api.c
@@ -49,10 +49,11 @@
#include "msg/msgd/mqd.h"
#include "mqd_imm.h"
+#include "mqd_ntf.h"
MQDLIB_INFO gl_mqdinfo;
-enum { FD_TERM = 0, FD_AMF, FD_MBCSV, FD_MBX, FD_CLM, NUM_FD };
+enum { FD_TERM = 0, FD_AMF, FD_MBCSV, FD_MBX, FD_CLM, FD_NTF, NUM_FD };
static struct pollfd fds[NUM_FD];
static nfds_t nfds = NUM_FD;
@@ -146,12 +147,50 @@ static SaAisErrorT mqd_clm_init(MQD_CB *cb)
TRACE_1("saClmClusterTrack success");
} while (false);
- if (saErr != SA_AIS_OK && !cb->clm_hdl)
+ if (saErr != SA_AIS_OK && cb->clm_hdl)
saClmFinalize(cb->clm_hdl);
return saErr;
}
+static SaAisErrorT mqd_ntf_init(MQD_CB *cb)
+{
+ SaAisErrorT rc = SA_AIS_OK;
+ TRACE_ENTER();
+
+ do {
+ SaVersionT ntfVersion = { 'A', 1, 1 };
+ SaNtfCallbacksT callbacks = {
+ mqdNtfNotificationCallback,
+ 0
+ };
+
+ rc = saNtfInitialize(&cb->ntfHandle, &callbacks, &ntfVersion);
+ if (rc != SA_AIS_OK) {
+ LOG_ER("saNtfInitialize failed with error %u", rc);
+ break;
+ }
+
+ rc = saNtfSelectionObjectGet(cb->ntfHandle, &cb->ntf_sel_obj);
+ if (SA_AIS_OK != rc) {
+ LOG_ER("saNtfSelectionObjectGet failed with error %u",
+ rc);
+ break;
+ }
+
+ rc = mqdInitNtfSubscriptions(cb->ntfHandle);
+ if (rc != SA_AIS_OK)
+ break;
+ } while (false);
+
+ if (rc != SA_AIS_OK && cb->ntfHandle)
+ saNtfFinalize(cb->ntfHandle);
+
+ TRACE_LEAVE();
+
+ return rc;
+}
+
/****************************************************************************\
PROCEDURE NAME : mqd_lib_init
@@ -306,6 +345,16 @@ static uint32_t mqd_lib_init(void)
mqd_cb_shut(pMqd);
return NCSCC_RC_FAILURE;
}
+
+ if (mqd_ntf_init(pMqd) != SA_AIS_OK) {
+ mqd_asapi_unbind();
+ saAmfFinalize(pMqd->amf_hdl);
+ saClmFinalize(pMqd->clm_hdl);
+ ncshm_give_hdl(pMqd->hdl);
+ mqd_cb_shut(pMqd);
+ return NCSCC_RC_FAILURE;
+ }
+
if ((rc = initialize_for_assignment(pMqd, pMqd->ha_state)) !=
NCSCC_RC_SUCCESS) {
LOG_ER("initialize_for_assignment FAILED %u", (unsigned)rc);
@@ -493,6 +542,8 @@ void mqd_main_process(uint32_t hdl)
fds[FD_CLM].events = POLLIN;
fds[FD_MBCSV].fd = pMqd->mbcsv_sel_obj;
fds[FD_MBCSV].events = POLLIN;
+ fds[FD_NTF].fd = pMqd->ntf_sel_obj;
+ fds[FD_NTF].events = POLLIN;
int ret = poll(fds, nfds, -1);
if (ret == -1) {
@@ -522,6 +573,15 @@ void mqd_main_process(uint32_t hdl)
LOG_ER("saClmDispatch failed: %u", err);
}
}
+
+ if (fds[FD_NTF].revents & POLLIN) {
+ /* dispatch all the NTF pending function */
+ err = saNtfDispatch(pMqd->ntfHandle, SA_DISPATCH_ALL);
+ if (err != SA_AIS_OK) {
+ LOG_ER("saNtfDispatch failed: %u", err);
+ }
+ }
+
/* dispatch all the MBCSV pending callbacks */
if (fds[FD_MBCSV].revents & POLLIN) {
mbcsv_arg.i_op = NCS_MBCSV_OP_DISPATCH;
diff --git a/src/msg/msgd/mqd_db.h b/src/msg/msgd/mqd_db.h
index 022b03a..93dc18f 100644
--- a/src/msg/msgd/mqd_db.h
+++ b/src/msg/msgd/mqd_db.h
@@ -34,6 +34,7 @@
#include <stdbool.h>
#include <saClm.h>
#include <saImmOi.h>
+#include <saNtf.h>
#define MQSV_MQD_MBCSV_VERSION 1
#define MQSV_MQD_MBCSV_VERSION_MIN 1
@@ -91,6 +92,7 @@ typedef struct mqd_qinfo {
NCS_QUEUE ilist; /* Queue/Group element list */
NCS_QUEUE tlist; /* List of the user who opted track */
SaTimeT creationTime;
+ bool capacityReached;
} MQD_OBJ_INFO;
typedef struct mqd_object_elem {
@@ -146,6 +148,7 @@ typedef struct mqd_cb {
SaClmHandleT clm_hdl;
SaAmfHAStateT ha_state; /* Present AMF HA state of the component */
SaNameT comp_name;
+ SaNtfHandleT ntfHandle;
NCS_MBCSV_HDL mbcsv_hdl; /*MBCSV handle for Redundancy of initialize
*/
NCS_MBCSV_CKPT_HDL o_ckpt_hdl; /*Opened Checkpoint Handle */
@@ -171,6 +174,7 @@ typedef struct mqd_cb {
SaSelectionObjectT imm_sel_obj; /*Selection object to wait for
IMM events */
SaSelectionObjectT clm_sel_obj;
+ SaSelectionObjectT ntf_sel_obj;
bool fully_initialized;
} MQD_CB;
diff --git a/src/msg/msgd/mqd_ntf.cc b/src/msg/msgd/mqd_ntf.cc
new file mode 100644
index 0000000..ab29193
--- /dev/null
+++ b/src/msg/msgd/mqd_ntf.cc
@@ -0,0 +1,310 @@
+/* -*- OpenSAF -*-
+ *
+ * (C) Copyright 2017 The OpenSAF Foundation
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed
+ * under the GNU Lesser General Public License Version 2.1, February 1999.
+ * The complete license can be accessed from the following location:
+ * http://opensource.org/licenses/lgpl-license.php
+ * See the Copying file included with the OpenSAF distribution for full
+ * licensing terms.
+ *
+ * Author(s): Genband
+ *
+ */
+#include <cstring>
+#include <vector>
+#include <saMsg.h>
+#include "base/ncs_edu_pub.h"
+#include "base/logtrace.h"
+#include "base/ncs_queue.h"
+#include "base/ncspatricia.h"
+#include "base/ncssysf_tmr.h"
+#include "mbc/mbcsv_papi.h"
+#include "mds/mds_papi.h"
+#include "msg/common/mqsv_def.h"
+#include "msg/common/mqsv_asapi.h"
+#include "mqd_tmr.h"
+#include "mqd_db.h"
+#include "mqd_dl_api.h"
+#include "mqd_ntf.h"
+
+extern MQDLIB_INFO gl_mqdinfo;
+
+static const int operStateSubId = 1;
+
+static void sendStateChangeNotification(MQD_CB *cb,
+ const SaNameT& queueGroup,
+ SaUint16T status) {
+ TRACE_ENTER();
+
+ do {
+ if (status == SA_MSG_QUEUE_CAPACITY_REACHED)
+ status = SA_MSG_QUEUE_GROUP_CAPACITY_REACHED;
+ else if (status == SA_MSG_QUEUE_CAPACITY_AVAILABLE)
+ status = SA_MSG_QUEUE_GROUP_CAPACITY_AVAILABLE;
+
+ const SaNameT notifyingObject = {
+ sizeof("safApp=safMsgService") - 1,
+ "safApp=safMsgService"
+ };
+
+ SaNtfStateChangeNotificationT ntfStateChange = { 0 };
+
+ SaAisErrorT rc(saNtfStateChangeNotificationAllocate(cb->ntfHandle,
+ &ntfStateChange,
+ 1,
+ 0,
+ 0,
+ 1,
+ 0));
+
+ if (rc != SA_AIS_OK) {
+ LOG_ER("saNtfStateChangeNotificationAllocate " "failed: %i", rc);
+ break;
+ }
+
+ *ntfStateChange.notificationHeader.eventType = SA_NTF_OBJECT_STATE_CHANGE;
+
+ memcpy(ntfStateChange.notificationHeader.notificationObject,
+ &queueGroup,
+ sizeof(SaNameT));
+
+ memcpy(ntfStateChange.notificationHeader.notifyingObject,
+ ¬ifyingObject,
+ sizeof(SaNameT));
+
+ ntfStateChange.notificationHeader.notificationClassId->vendorId =
+ SA_NTF_VENDOR_ID_SAF;
+ ntfStateChange.notificationHeader.notificationClassId->majorId =
+ SA_SVC_MSG;
+ ntfStateChange.notificationHeader.notificationClassId->minorId =
+ (status == SA_MSG_QUEUE_GROUP_CAPACITY_REACHED) ? 0x67 : 0x68;
+
+ *ntfStateChange.sourceIndicator = SA_NTF_OBJECT_OPERATION;
+
+ ntfStateChange.changedStates[0].stateId = SA_MSG_DEST_CAPACITY_STATUS;
+
+ ntfStateChange.changedStates[0].oldStatePresent =
+ (status == SA_MSG_QUEUE_GROUP_CAPACITY_REACHED) ? SA_FALSE : SA_TRUE;
+
+ if (status == SA_MSG_QUEUE_GROUP_CAPACITY_AVAILABLE) {
+ ntfStateChange.changedStates[0].oldState =
+ SA_MSG_QUEUE_GROUP_CAPACITY_REACHED;
+ }
+
+ ntfStateChange.changedStates[0].newState = status;
+
+ rc = saNtfNotificationSend(ntfStateChange.notificationHandle);
+
+ if (rc != SA_AIS_OK)
+ LOG_ER("saNtfNotificationSend failed: %i", rc);
+
+ rc = saNtfNotificationFree(ntfStateChange.notificationHandle);
+
+ if (rc != SA_AIS_OK)
+ LOG_ER("saNtfNotificationFree failed: %i", rc);
+ } while (false);
+
+ TRACE_LEAVE();
+}
+
+static void updateQueueGroup(MQD_CB *cb,
+ const SaNameT& queueName,
+ SaUint16T status) {
+ TRACE_ENTER();
+
+ bool member(false), allFull(true);
+ typedef std::vector<SaNameT> GroupList;
+ GroupList groupList;
+
+ // maybe there is a better way to do this?
+ for (MQD_OBJ_NODE *objNode(reinterpret_cast<MQD_OBJ_NODE *>
+ (ncs_patricia_tree_getnext(&cb->qdb, 0)));
+ objNode;
+ objNode = reinterpret_cast<MQD_OBJ_NODE *>(ncs_patricia_tree_getnext(
+ &cb->qdb,
+ reinterpret_cast<uint8_t *>(&objNode->oinfo.name)))) {
+
+ if (objNode->oinfo.type == MQSV_OBJ_QGROUP) {
+ NCS_Q_ITR itr = { 0 };
+
+ for (MQD_OBJECT_ELEM *objElem(static_cast<MQD_OBJECT_ELEM *>
+ (ncs_queue_get_next(&objNode->oinfo.ilist, &itr)));
+ objElem;
+ objElem = static_cast<MQD_OBJECT_ELEM *>
+ (ncs_queue_get_next(&objNode->oinfo.ilist, &itr))) {
+ if (objElem->pObject->type == MQSV_OBJ_QUEUE) {
+ if (queueName.length == objElem->pObject->name.length &&
+ !memcmp(queueName.value,
+ objElem->pObject->name.value,
+ queueName.length)) {
+ member = true;
+ if (status == SA_MSG_QUEUE_CAPACITY_REACHED)
+ objElem->pObject->capacityReached = true;
+ else if (status == SA_MSG_QUEUE_CAPACITY_AVAILABLE)
+ objElem->pObject->capacityReached = false;
+
+ groupList.push_back(objNode->oinfo.name);
+ } else if (!objElem->pObject->capacityReached) {
+ allFull = false;
+ }
+ }
+ }
+ }
+ }
+
+ if (member && ((status == SA_MSG_QUEUE_CAPACITY_REACHED && allFull) ||
+ (status == SA_MSG_QUEUE_CAPACITY_AVAILABLE && allFull))) {
+ for (auto& it : groupList)
+ sendStateChangeNotification(cb, it, status);
+ } else {
+ // don't care about anything else
+ }
+
+ TRACE_LEAVE();
+}
+
+static bool isMSGStateChange(const SaNtfClassIdT &classId) {
+ TRACE_ENTER();
+ bool status(false);
+
+ if (classId.vendorId == SA_NTF_VENDOR_ID_SAF &&
+ classId.majorId == SA_SVC_MSG) {
+ status = true;
+ }
+
+ TRACE_LEAVE2("%i", status);
+ return status;
+}
+
+static void handleMsgObjectStateChangeNotification(
+ MQD_CB *cb,
+ const SaNtfStateChangeNotificationT& stateChangeNotification) {
+ TRACE_ENTER();
+
+ do {
+ if (*stateChangeNotification.sourceIndicator != SA_NTF_OBJECT_OPERATION) {
+ LOG_ER("expected object operation -- got %i",
+ *stateChangeNotification.sourceIndicator);
+ break;
+ }
+
+ if (stateChangeNotification.numStateChanges != 1) {
+ LOG_ER("expected one state change -- got %i",
+ stateChangeNotification.numStateChanges);
+ break;
+ }
+
+ if (stateChangeNotification.changedStates[0].stateId !=
+ SA_MSG_DEST_CAPACITY_STATUS) {
+ LOG_ER("unknown state id: %i",
+ stateChangeNotification.changedStates[0].stateId);
+ break;
+ }
+
+ // if all the queues for a group are full then send a notification
+ updateQueueGroup(
+ cb,
+ *stateChangeNotification.notificationHeader.notificationObject,
+ stateChangeNotification.changedStates[0].newState);
+ } while (false);
+
+ TRACE_LEAVE();
+}
+
+static void handleStateChangeNotification(
+ MQD_CB *cb,
+ const SaNtfStateChangeNotificationT& stateChangeNotification) {
+ TRACE_ENTER();
+
+ if (stateChangeNotification.notificationHeader.notificationClassId &&
+ isMSGStateChange(
+ *stateChangeNotification.notificationHeader.notificationClassId)) {
+ handleMsgObjectStateChangeNotification(cb, stateChangeNotification);
+ } else {
+ TRACE("ignoring non-MSG state change notification");
+ }
+
+ TRACE_LEAVE();
+}
+
+void mqdNtfNotificationCallback(SaNtfSubscriptionIdT subscriptionId,
+ const SaNtfNotificationsT *notification) {
+ TRACE_ENTER();
+ MQD_CB *cb(static_cast<MQD_CB *>(ncshm_take_hdl(NCS_SERVICE_ID_MQD,
+ gl_mqdinfo.inst_hdl)));
+
+ if (!cb) {
+ LOG_ER("can't get mqd control block");
+ }
+
+ do {
+ if (cb->ha_state != SA_AMF_HA_ACTIVE)
+ break;
+
+ if (subscriptionId != operStateSubId) {
+ TRACE("unknown subscription id received in mqdNtfNotificationCallback: "
+ "%d",
+ subscriptionId);
+ break;
+ }
+
+ if (notification->notificationType == SA_NTF_TYPE_STATE_CHANGE) {
+ handleStateChangeNotification(
+ cb,
+ notification->notification.stateChangeNotification);
+ } else {
+ TRACE("ignoring NTF notification of type: %i",
+ notification->notificationType);
+ }
+ } while (false);
+
+ ncshm_give_hdl(cb->hdl);
+
+ TRACE_LEAVE();
+}
+
+SaAisErrorT mqdInitNtfSubscriptions(SaNtfHandleT ntfHandle) {
+ TRACE_ENTER();
+ SaAisErrorT rc(SA_AIS_OK);
+
+ do {
+ SaNtfStateChangeNotificationFilterT filter;
+
+ rc = saNtfStateChangeNotificationFilterAllocate(
+ ntfHandle,
+ &filter, 0, 0, 0, 0, 0, 0);
+
+ if (rc != SA_AIS_OK) {
+ LOG_ER("saNtfAttributeChangeNotificationFilterAllocate"
+ " failed: %i",
+ rc);
+ break;
+ }
+
+ SaNtfNotificationTypeFilterHandlesT filterHandles = {
+ 0, 0, filter.notificationFilterHandle, 0, 0
+ };
+
+ rc = saNtfNotificationSubscribe(&filterHandles,
+ operStateSubId);
+
+ if (rc != SA_AIS_OK) {
+ LOG_ER("saNtfNotificationSubscribe failed: %i", rc);
+ break;
+ }
+
+ rc = saNtfNotificationFilterFree(filter.notificationFilterHandle);
+
+ if (rc != SA_AIS_OK) {
+ LOG_ER("saNtfNotificationFilterFree failed: %i", rc);
+ break;
+ }
+ } while (false);
+
+ TRACE_LEAVE();
+ return rc;
+}
diff --git a/src/msg/msgd/mqd_ntf.h b/src/msg/msgd/mqd_ntf.h
new file mode 100644
index 0000000..4d9c0f1
--- /dev/null
+++ b/src/msg/msgd/mqd_ntf.h
@@ -0,0 +1,49 @@
+/* -*- OpenSAF -*-
+ *
+ * (C) Copyright 2017 The OpenSAF Foundation
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed
+ * under the GNU Lesser General Public License Version 2.1, February 1999.
+ * The complete license can be accessed from the following location:
+ * http://opensource.org/licenses/lgpl-license.php
+ * See the Copying file included with the OpenSAF distribution for full
+ * licensing terms.
+ *
+ * Author(s): Genband
+ *
+ */
+
+/*****************************************************************************
+..............................................................................
+
+ FILE NAME: mqd_ntf.h
+
+..............................................................................
+
+ DESCRIPTION:
+
+ MQD Ntf Structures & Parameters.
+
+******************************************************************************/
+
+#ifndef MSG_MSGD_MQD_NTF_H_
+#define MSG_MSGD_MQD_NTF_H_
+
+#include <saNtf.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+void mqdNtfNotificationCallback(SaNtfSubscriptionIdT subscriptionId,
+ const SaNtfNotificationsT *notification);
+
+SaAisErrorT mqdInitNtfSubscriptions(SaNtfHandleT);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/src/msg/msgnd/mqnd_db.h b/src/msg/msgnd/mqnd_db.h
index d04566e..fc7d926 100644
--- a/src/msg/msgnd/mqnd_db.h
+++ b/src/msg/msgnd/mqnd_db.h
@@ -74,6 +74,7 @@ typedef struct mqnd_queue_info {
SaMsgQueueHandleT listenerHandle; /* Listener queue handle */
SaNameT queueName;
SaSizeT size[SA_MSG_MESSAGE_LOWEST_PRIORITY + 1]; /* Size of the queue */
+ SaMsgQueueThresholdsT thresholds;
SaMsgQueueStatusT queueStatus; /* Status info of the queue */
SaMsgQueueSendingStateT sendingState; /* Sending state is removed from B.1.1,
but used internally */
@@ -91,6 +92,8 @@ typedef struct mqnd_queue_info {
uint32_t numberOfFullErrors[SA_MSG_MESSAGE_LOWEST_PRIORITY + 1];
SaUint64T totalQueueSize;
uint32_t shm_queue_index;
+ bool capacityReachedSent;
+ bool capacityAvailableSent;
} MQND_QUEUE_INFO;
typedef struct mqnd_qhndl_node {
@@ -127,6 +130,7 @@ typedef struct mqnd_queue_ckpt_info {
SaMsgQueueHandleT listenerHandle; /* Listener queue handle */
SaNameT queueName;
SaSizeT size[SA_MSG_MESSAGE_LOWEST_PRIORITY + 1]; /* Size of the queue */
+ SaMsgQueueThresholdsT thresholds;
SaMsgQueueCreationFlagsT creationFlags;
SaTimeT creationTime; /* Queue Creation time */
SaTimeT retentionTime;
@@ -145,6 +149,8 @@ typedef struct mqnd_queue_ckpt_info {
QueueStatsShm; /* Structure to store queue stats in shared memory */
MQND_TMR qtransfer_complete_tmr; /* Q Transfer Complete Timer */
uint32_t valid; /* To verify whether the checkpoint info is valid or not */
+ bool capacityReachedSent;
+ bool capacityAvailableSent;
} MQND_QUEUE_CKPT_INFO;
typedef struct mqnd_shm_info {
diff --git a/src/msg/msgnd/mqnd_evt.c b/src/msg/msgnd/mqnd_evt.c
index 0c2a612..fcbe4d6 100644
--- a/src/msg/msgnd/mqnd_evt.c
+++ b/src/msg/msgnd/mqnd_evt.c
@@ -42,6 +42,8 @@ static uint32_t mqnd_evt_proc_update_stats_shm(MQND_CB *cb,
MQSV_DSEND_EVT *evt);
static uint32_t mqnd_evt_proc_cb_dump(void);
static uint32_t mqnd_evt_proc_ret_time_set(MQND_CB *cb, MQSV_EVT *evt);
+static uint32_t mqnd_evt_proc_cap_set(MQND_CB *, MQSV_EVT *);
+static uint32_t mqnd_evt_proc_cap_get(MQND_CB *, MQSV_EVT *);
static void mqnd_dump_queue_status(MQND_CB *cb, SaMsgQueueStatusT *queueStatus,
uint32_t offset);
static void mqnd_dump_timer_info(MQND_TMR tmr);
@@ -186,6 +188,12 @@ static uint32_t mqnd_proc_mqp_req_msg(MQND_CB *cb,
MQSV_EVT *evt)
case MQP_EVT_Q_RET_TIME_SET_REQ:
rc = mqnd_evt_proc_ret_time_set(cb, evt);
break;
+ case MQP_EVT_CAP_SET_REQ:
+ rc = mqnd_evt_proc_cap_set(cb, evt);
+ break;
+ case MQP_EVT_CAP_GET_REQ:
+ rc = mqnd_evt_proc_cap_get(cb, evt);
+ break;
default:
LOG_ER("%s:%u: unrecognized message type: %d", __FILE__,
__LINE__, evt->msg.mqp_req.type);
@@ -946,6 +954,191 @@ send_rsp:
}
/****************************************************************************
+ * Name : sendStateChangeNotification
+ *
+ * Description : Send state change notification for queues
+ *
+ * Arguments : MQND_CB *cb - MQND CB pointer
+ * MQND_QUEUE_INFO *qInfo - queue info struct
+ * SaMsgMessageCapacityStatusT - status to send
+ *
+ * Return Values : None.
+ *
+ * Notes : None.
+ *****************************************************************************/
+static void sendStateChangeNotification(MQND_CB *cb,
+ MQND_QUEUE_INFO *qInfo,
+ SaMsgMessageCapacityStatusT status)
+{
+ SaNtfHandleT ntfHandle = 0;
+
+ do {
+ SaVersionT ntfVersion = { 'A', 1, 1 };
+
+ /* do we need to send it? */
+ if ((status == SA_MSG_QUEUE_CAPACITY_REACHED &&
+ qInfo->capacityReachedSent) ||
+ (status == SA_MSG_QUEUE_CAPACITY_AVAILABLE &&
+ qInfo->capacityAvailableSent)) {
+ break;
+ }
+
+ SaAisErrorT rc = saNtfInitialize(&ntfHandle, 0, &ntfVersion);
+
+ if (rc != SA_AIS_OK) {
+ LOG_ER("saNtfInitialize failed: %i", rc);
+ break;
+ }
+
+ const SaNameT notifyingObject = {
+ sizeof("safApp=safMsgService") - 1,
+ "safApp=safMsgService"
+ };
+
+ SaNtfStateChangeNotificationT ntfStateChange = { 0 };
+
+ rc = saNtfStateChangeNotificationAllocate(
+ ntfHandle,
+ &ntfStateChange,
+ 1, /* numCorrelated Notifications */
+ 0, /* length addition text */
+ 0, /* num additional info */
+ 1, /* num of state changes */
+ 0); /* variable data size */
+
+ if (rc != SA_AIS_OK) {
+ LOG_ER("saNtfStateChangeNotificationAllocate "
+ "failed: %i", rc);
+ break;
+ }
+
+ *ntfStateChange.notificationHeader.eventType =
+ SA_NTF_OBJECT_STATE_CHANGE;
+
+ memcpy(ntfStateChange.notificationHeader.notificationObject,
+ &qInfo->queueName,
+ sizeof(SaNameT));
+
+ memcpy(ntfStateChange.notificationHeader.notifyingObject,
+ ¬ifyingObject,
+ sizeof(SaNameT));
+
+ ntfStateChange.notificationHeader.notificationClassId->vendorId
=
+ SA_NTF_VENDOR_ID_SAF;
+ ntfStateChange.notificationHeader.notificationClassId->majorId =
+ SA_SVC_MSG;
+ ntfStateChange.notificationHeader.notificationClassId->minorId =
+ (status == SA_MSG_QUEUE_CAPACITY_REACHED) ?
+ 0x65 : 0x66;
+
+ *ntfStateChange.sourceIndicator = SA_NTF_OBJECT_OPERATION;
+
+ ntfStateChange.changedStates[0].stateId =
+ SA_MSG_DEST_CAPACITY_STATUS;
+
+ ntfStateChange.changedStates[0].oldStatePresent =
+ (status == SA_MSG_QUEUE_CAPACITY_REACHED) ?
+ SA_FALSE : SA_TRUE;
+
+ if (status == SA_MSG_QUEUE_CAPACITY_AVAILABLE) {
+ ntfStateChange.changedStates[0].oldState =
+ SA_MSG_QUEUE_CAPACITY_REACHED;
+ }
+
+ ntfStateChange.changedStates[0].newState = status;
+
+ rc = saNtfNotificationSend(ntfStateChange.notificationHandle);
+
+ if (rc != SA_AIS_OK)
+ LOG_ER("saNtfNotificationSend failed: %i", rc);
+ else {
+ if (status == SA_MSG_QUEUE_CAPACITY_REACHED) {
+ qInfo->capacityReachedSent = true;
+ qInfo->capacityAvailableSent = false;
+ } else {
+ qInfo->capacityReachedSent = false;
+ qInfo->capacityAvailableSent = true;
+ }
+ }
+
+ rc = saNtfNotificationFree(ntfStateChange.notificationHandle);
+
+ if (rc != SA_AIS_OK)
+ LOG_ER("saNtfNotificationFree failed: %i", rc);
+ } while (false);
+
+ if (ntfHandle) {
+ SaAisErrorT rc = saNtfFinalize(ntfHandle);
+
+ if (rc != SA_AIS_OK)
+ LOG_ER("saNtfFinalize failed: %i", rc);
+ }
+}
+
+/****************************************************************************
+ * Name : checkCapacity
+ *
+ * Description : Function to check whether we need to send state change
+ * notification.
+ *
+ * Arguments : MQND_CB *cb - MQND CB pointer
+ * MQND_QUEUE_INFO *qInfo - queue info
+ *
+ * Return Values : None.
+ *
+ * Notes : None.
+ *****************************************************************************/
+static void checkCapacity(MQND_CB *cb, MQND_QUEUE_INFO *qInfo)
+{
+ int i;
+ uint32_t offset = qInfo->shm_queue_index;
+ MQND_QUEUE_CKPT_INFO *shm_base_addr = cb->mqnd_shm.shm_base_addr;
+ bool sendNotification = true;
+ SaMsgMessageCapacityStatusT capacityType =
+ SA_MSG_QUEUE_CAPACITY_REACHED;
+
+ TRACE_ENTER();
+
+ /* send capacity notifications */
+ for (i = SA_MSG_MESSAGE_HIGHEST_PRIORITY;
+ i <= SA_MSG_MESSAGE_LOWEST_PRIORITY;
+ i++)
+ {
+ TRACE("capacityReached[%i]: %llu capacityAvailable: %llu "
+ "queueUsed: %llu",
+ i,
+ qInfo->thresholds.capacityReached[i],
+ qInfo->thresholds.capacityAvailable[i],
+ shm_base_addr[offset].QueueStatsShm.
+ saMsgQueueUsage[i].queueUsed);
+ if (!qInfo->capacityReachedSent &&
+ qInfo->thresholds.capacityReached[i] >=
+ shm_base_addr[offset].QueueStatsShm.
+ saMsgQueueUsage[i].queueUsed) {
+ /* only send notification if all are full */
+ sendNotification = false;
+ TRACE("not sending notification");
+ break;
+ } else if (!qInfo->capacityAvailableSent &&
+ qInfo->thresholds.capacityAvailable[i] >
+ shm_base_addr[offset].QueueStatsShm.
+ saMsgQueueUsage[i].queueUsed) {
+ /* send notification if at least one comes available */
+ TRACE("sending available notification");
+ sendNotification = true;
+ capacityType = SA_MSG_QUEUE_CAPACITY_AVAILABLE;
+ break;
+ }
+ }
+
+ TRACE("sendNotification: %i", sendNotification);
+ if (sendNotification)
+ sendStateChangeNotification(cb, qInfo, capacityType);
+
+ TRACE_LEAVE();
+}
+
+/****************************************************************************
* Name : mqnd_evt_proc_update_stats_shm
*
* Description : Function to update stats of queue in shm when message is
@@ -998,7 +1191,10 @@ static uint32_t mqnd_evt_proc_update_stats_shm(MQND_CB
*cb, MQSV_DSEND_EVT *evt)
statsReq = &evt->info.statsReq;
- if (cb->is_restart_done)
+ if (!cb->clm_node_joined) {
+ err = SA_AIS_ERR_UNAVAILABLE;
+ goto done;
+ } else if (cb->is_restart_done)
err = SA_AIS_OK;
else {
err = SA_AIS_ERR_TRY_AGAIN;
@@ -1039,6 +1235,8 @@ static uint32_t mqnd_evt_proc_update_stats_shm(MQND_CB
*cb, MQSV_DSEND_EVT *evt)
goto done;
}
+ checkCapacity(cb, &qnode->qinfo);
+
done:
direct_rsp_evt =
@@ -1292,6 +1490,9 @@ static uint32_t mqnd_evt_proc_send_msg(MQND_CB *cb,
MQSV_DSEND_EVT *evt)
mqnd_send_msg_update_stats_shm(cb, qnode, snd_msg->message.size,
snd_msg->message.priority);
+
+ checkCapacity(cb, &qnode->qinfo);
+
send_resp:
/* If the error happens in SendReceive case while sending the message
then return the response from here and don't wait for the reply
@@ -1741,6 +1942,162 @@ send_rsp:
}
/****************************************************************************
+ * Name : mqnd_evt_proc_cap_set
+ *
+ * Description : Function to set capacity thresholds of queue
+ *
+ * Arguments :
+ *
+ * Return Values : NCSCC_RC_SUCCESS/Error.
+ *
+ * Notes : None.
+ *****************************************************************************/
+static uint32_t mqnd_evt_proc_cap_set(MQND_CB *cb, MQSV_EVT *evt)
+{
+ SaAisErrorT err = SA_AIS_OK;
+ MQSV_EVT rsp_evt;
+ uint32_t rc = NCSCC_RC_SUCCESS;
+ TRACE_ENTER();
+
+ do {
+ MQND_QUEUE_NODE *qnode = NULL;
+ int i;
+
+ if (!cb->clm_node_joined) {
+ err = SA_AIS_ERR_UNAVAILABLE;
+ break;
+ } else if (cb->is_restart_done) {
+ err = SA_AIS_OK;
+ } else {
+ LOG_ER("%s:%u: ERR_TRY_AGAIN: MQND is not completely "
+ "Initialized",
+ __FILE__, __LINE__);
+ err = SA_AIS_ERR_TRY_AGAIN;
+ break;
+ }
+
+ mqnd_queue_node_get(cb,
+ evt->msg.mqp_req.info.retTimeSetReq.queueHandle,
+ &qnode);
+
+ /* If queue not found */
+ if (!qnode) {
+ LOG_ER("ERR_BAD_HANDLE: Get queue node Failed");
+ err = SA_AIS_ERR_BAD_HANDLE;
+ break;
+ }
+
+ /*
+ * capacityAvailable and capacityReached have already been
+ * checked in the agent with regards to each other
+ */
+ for (i = SA_MSG_MESSAGE_HIGHEST_PRIORITY;
+ i <= SA_MSG_MESSAGE_LOWEST_PRIORITY;
+ i++)
+ {
+ if (qnode->qinfo.size[i] <
+
evt->msg.mqp_req.info.capacity.thresholds.capacityReached[i]) {
+ TRACE("size is less than capacity reached");
+ err = SA_AIS_ERR_INVALID_PARAM;
+ break;
+ }
+ }
+
+ if (err != SA_AIS_OK)
+ break;
+
+ qnode->qinfo.thresholds =
+ evt->msg.mqp_req.info.capacity.thresholds;
+ } while (false);
+
+ /*Send the resp to MQA */
+ memset(&rsp_evt, 0, sizeof(MQSV_EVT));
+
+ rsp_evt.type = MQSV_EVT_MQP_RSP;
+ rsp_evt.msg.mqp_rsp.type = MQP_EVT_CAP_SET_RSP;
+ rsp_evt.msg.mqp_rsp.error = err;
+
+ rc = mqnd_mds_send_rsp(cb, &evt->sinfo, &rsp_evt);
+
+ if (rc != NCSCC_RC_SUCCESS)
+ LOG_ER(
+ "Queue Capacity set :Mds Send Response Failed %" PRIx64,
+ evt->sinfo.dest);
+ else
+ TRACE_1("Queue Capacity set: Mds Send Response Success");
+
+ TRACE_LEAVE2("Returned with return code %u", rc);
+ return rc;
+}
+
+/****************************************************************************
+ * Name : mqnd_evt_proc_cap_get
+ *
+ * Description : Function to get capacity thresholds of queue
+ *
+ * Arguments :
+ *
+ * Return Values : NCSCC_RC_SUCCESS/Error.
+ *
+ * Notes : None.
+ *****************************************************************************/
+static uint32_t mqnd_evt_proc_cap_get(MQND_CB *cb, MQSV_EVT *evt)
+{
+ SaAisErrorT err = SA_AIS_OK;
+ MQSV_EVT rsp_evt;
+ MQND_QUEUE_NODE *qnode = NULL;
+ uint32_t rc = NCSCC_RC_SUCCESS;
+
+ TRACE_ENTER();
+
+ do {
+ if (!cb->clm_node_joined) {
+ err = SA_AIS_ERR_UNAVAILABLE;
+ break;
+ } else if (cb->is_restart_done) {
+ err = SA_AIS_OK;
+ } else {
+ LOG_ER("%s:%u: ERR_TRY_AGAIN: MQND is not completely "
+ "Initialized",
+ __FILE__, __LINE__);
+ err = SA_AIS_ERR_TRY_AGAIN;
+ break;
+ }
+
+ mqnd_queue_node_get(cb,
+ evt->msg.mqp_req.info.retTimeSetReq.queueHandle,
+ &qnode);
+
+ /* If queue not found */
+ if (!qnode) {
+ LOG_ER("ERR_BAD_HANDLE: Get queue node Failed");
+ err = SA_AIS_ERR_BAD_HANDLE;
+ break;
+ }
+ } while (false);
+
+ /*Send the resp to MQA */
+ memset(&rsp_evt, 0, sizeof(MQSV_EVT));
+
+ rsp_evt.type = MQSV_EVT_MQP_RSP;
+ rsp_evt.msg.mqp_rsp.type = MQP_EVT_CAP_GET_RSP;
+ rsp_evt.msg.mqp_rsp.error = err;
+ rsp_evt.msg.mqp_rsp.info.capacity.thresholds = qnode->qinfo.thresholds;
+
+ rc = mqnd_mds_send_rsp(cb, &evt->sinfo, &rsp_evt);
+
+ if (rc != NCSCC_RC_SUCCESS)
+ LOG_ER(
+ "Queue Capacity get :Mds Send Response Failed %" PRIx64,
+ evt->sinfo.dest);
+ else
+ TRACE_1("Queue Capacity get: Mds Send Response Success");
+
+ TRACE_LEAVE2("Returned with return code %u", rc);
+ return rc;
+}
+
+/****************************************************************************
* Name : mqnd_evt_proc_cb_dump
*
* Description : Function to print the control block infomration
diff --git a/src/msg/msgnd/mqnd_imm.c b/src/msg/msgnd/mqnd_imm.c
index 349790d..feb4aa4 100644
--- a/src/msg/msgnd/mqnd_imm.c
+++ b/src/msg/msgnd/mqnd_imm.c
@@ -460,7 +460,6 @@ SaAisErrorT
mqnd_create_runtime_MsgQPriorityobject(SaStringT rname,
SaImmOiHandleT immOiHandle)
{
SaAisErrorT rc = SA_AIS_OK;
- SaUint64T def_val = 0;
int i = 0;
SaImmAttrValueT arr1[1], arr2[1], arr3[1], arr4[1];
SaImmAttrValuesT_2 attr_mqprio, attr_mqprioSize, attr_capavail,
@@ -484,8 +483,8 @@ SaAisErrorT
mqnd_create_runtime_MsgQPriorityobject(SaStringT rname,
arr1[0] = &mqprdn;
arr2[0] = &(qnode->qinfo.size[i]);
- arr3[0] = &def_val; /* not implemented */
- arr4[0] = &def_val; /* not implemented */
+ arr3[0] = &qnode->qinfo.thresholds.capacityAvailable[i];
+ arr4[0] = &qnode->qinfo.thresholds.capacityReached[i];
attr_mqprio.attrName = "safMqPrio";
attr_mqprio.attrValueType = SA_IMM_ATTR_SASTRINGT;
diff --git a/src/msg/msgnd/mqnd_util.c b/src/msg/msgnd/mqnd_util.c
index 8dd79fb..10c78f5 100644
--- a/src/msg/msgnd/mqnd_util.c
+++ b/src/msg/msgnd/mqnd_util.c
@@ -79,6 +79,10 @@ uint32_t mqnd_queue_create(MQND_CB *cb, MQP_OPEN_REQ *open,
MDS_DEST *rcvr_mqa,
i <= SA_MSG_MESSAGE_LOWEST_PRIORITY; i++) {
qnode->qinfo.totalQueueSize += open->creationAttributes.size[i];
qnode->qinfo.size[i] = open->creationAttributes.size[i];
+ qnode->qinfo.thresholds.capacityReached[i] =
+ qnode->qinfo.size[i];
+ qnode->qinfo.thresholds.capacityAvailable[i] =
+ qnode->qinfo.size[i];
qnode->qinfo.queueStatus.saMsgQueueUsage[i].queueSize =
open->creationAttributes.size[i];
}
--
2.9.5
------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, Slashdot.org! http://sdm.link/slashdot
_______________________________________________
Opensaf-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/opensaf-devel