Hi Srinivas,

  You need to run that test on a node that is not the active controller.


Alex

________________________________
From: Srinivas Mangipudy <[email protected]>
Sent: Wednesday, November 8, 2017 4:19:14 AM
To: Alex Jones
Cc: [email protected]
Subject: RE: [PATCH 1/1] msg: add support for critical capacity thresholds 
[#2625]

________________________________
NOTICE: This email was received from an EXTERNAL sender
________________________________


Hi Alex,

I executed “msgtest” and it failed with the below error:

Suite 26: SA_AIS_ERR_UNAVAILABLE Test Suite

error - saImmOmAdminOperationInvoke_2 admin-op RETURNED: 
SA_AIS_ERR_NOT_SUPPORTED (19)

The process tried to execute the command “immadm -o 2 
safNode=SC-2,safCluster=myClmCluster” and it returned SA_AIS_ERR_NOT_SUPPORTED 
error.

Logs had below info:

Nov  8 14:42:45 osaf-VirtualBox osafclmd[3943]: NO Lock on active node not 
allowed

Nov  8 14:42:45 osaf-VirtualBox osafclmd[3943]: NO clms_imm_node_lock failed

Nov  8 14:43:20 osaf-VirtualBox osafclmd[3943]: NO Lock on active node not 
allowed

Nov  8 14:43:20 osaf-VirtualBox osafclmd[3943]: NO clms_imm_node_lock failed

Thank you

Srinivas

-----Original Message-----
From: Alex Jones [mailto:[email protected]]
Sent: Tuesday, November 7, 2017 11:45 PM
To: Srinivas Mangipudy <[email protected]>
Cc: [email protected]; Alex Jones <[email protected]>
Subject: [PATCH 1/1] msg: add support for critical capacity thresholds [#2625]

This commit implements Sections 3.4.12 and 6.2.2 of the B.03.01 MSG spec.

---

 src/msg/Makefile.am                        |    8 +-

 src/msg/agent/mqa_api.c                    |  330 +++++++++

 src/msg/apitest/test_CapacityThresholds.cc | 1049 ++++++++++++++++++++++++++++

 src/msg/apitest/test_ErrUnavailable.cc     |  110 ++-

 src/msg/common/mqsv_edu.c                  |   74 +-

 src/msg/common/mqsv_evt.h                  |   15 +-

 src/msg/msgd/mqd_api.c                     |   64 +-

 src/msg/msgd/mqd_db.h                      |    4 +

 src/msg/msgd/mqd_ntf.cc                    |  310 ++++++++

 src/msg/msgd/mqd_ntf.h                     |   49 ++

 src/msg/msgnd/mqnd_db.h                    |    6 +

 src/msg/msgnd/mqnd_evt.c                   |  359 +++++++++-

 src/msg/msgnd/mqnd_imm.c                   |    5 +-

 src/msg/msgnd/mqnd_util.c                  |    4 +

 14 files changed, 2374 insertions(+), 13 deletions(-)

 create mode 100644 src/msg/apitest/test_CapacityThresholds.cc

 create mode 100644 src/msg/msgd/mqd_ntf.cc

 create mode 100644 src/msg/msgd/mqd_ntf.h

diff --git a/src/msg/Makefile.am b/src/msg/Makefile.am

index aeec6ca..c308072 100644

--- a/src/msg/Makefile.am

+++ b/src/msg/Makefile.am

@@ -97,6 +97,7 @@ noinst_HEADERS += \

        src/msg/msgd/mqd_dl_api.h \

        src/msg/msgd/mqd_imm.h \

        src/msg/msgd/mqd_mem.h \

+       src/msg/msgd/mqd_ntf.h \

        src/msg/msgd/mqd_red.h \

        src/msg/msgd/mqd_saf.h \

        src/msg/msgd/mqd_tmr.h \

@@ -158,6 +159,7 @@ bin_osafmsgnd_LDADD = \

        lib/libmsg_common.la \

        lib/libSaAmf.la \

        lib/libSaClm.la \

+       lib/libSaNtf.la \

        lib/libosaf_common.la \

        lib/libSaImmOi.la \

        lib/libSaImmOm.la \

@@ -182,7 +184,8 @@ bin_osafmsgd_SOURCES = \

        src/msg/msgd/mqd_mds.c \

        src/msg/msgd/mqd_red.c \

        src/msg/msgd/mqd_saf.c \

-       src/msg/msgd/mqd_tmr.c

+       src/msg/msgd/mqd_tmr.c \

+       src/msg/msgd/mqd_ntf.cc



 bin_osafmsgd_LDADD = \

        lib/libmsg_common.la \

@@ -191,6 +194,7 @@ bin_osafmsgd_LDADD = \

        lib/libosaf_common.la \

        lib/libSaImmOi.la \

        lib/libSaImmOm.la \

+       lib/libSaNtf.la \

        lib/libopensaf_core.la



 if ENABLE_TESTS

@@ -205,6 +209,7 @@ noinst_HEADERS += \

 bin_msgtest_SOURCES = \

   src/msg/apitest/msgtest.c \

   src/msg/apitest/test_saMsgVersionT.cc \

+  src/msg/apitest/test_CapacityThresholds.cc \

   src/msg/apitest/test_ErrUnavailable.cc \

   src/msg/apitest/tet_mqa_conf.c \

   src/msg/apitest/tet_mqsv_util.c \

@@ -212,6 +217,7 @@ bin_msgtest_SOURCES = \



 bin_msgtest_LDADD = \

   lib/libSaMsg.la \

+  lib/libSaNtf.la \

   lib/libopensaf_core.la \

   lib/libapitest.la



diff --git a/src/msg/agent/mqa_api.c b/src/msg/agent/mqa_api.c

index eeec00e..0070aaf 100644

--- a/src/msg/agent/mqa_api.c

+++ b/src/msg/agent/mqa_api.c

@@ -5600,6 +5600,336 @@ done:

        return rc;

 }



+/****************************************************************************

+  Name          : saMsgQueueCapacityThresholdsSet

+

+  Description   : This routine sets the critical capacity thresholds for the

+                 queue.

+

+  Arguments     : SaMsgQueueHandleT queueHandle

+               : const SaMsgQueueThresholdsT *thresholds

+

+  Return Values : SaAisErrorT

+

+  Notes         : None

+******************************************************************************/

+SaAisErrorT saMsgQueueCapacityThresholdsSet(SaMsgQueueHandleT queueHandle,

+               const SaMsgQueueThresholdsT *thresholds)

+{

+       SaAisErrorT rc = SA_AIS_OK;

+       MQA_CB *mqa_cb;

+       bool locked = false;

+

+       do {

+               MQSV_EVT cap_evt;

+               MQSV_EVT *out_evt = 0;

+               MQA_QUEUE_INFO *queue_node;

+               uint8_t mds_rc;

+               int i;

+

+               TRACE_ENTER2("SaMsgQueueHandle %llu ", queueHandle);

+

+               /* retrieve MQA CB */

+               mqa_cb = (MQA_CB *)m_MQSV_MQA_RETRIEVE_MQA_CB;

+               if (!mqa_cb) {

+                       TRACE_2("ERR_BAD_HANDLE: Control block retrieval "

+                                       "failed");

+                       rc = SA_AIS_ERR_BAD_HANDLE;

+                       break;

+               }

+

+               if (m_NCS_LOCK(&mqa_cb->cb_lock, NCS_LOCK_WRITE) !=

+                               NCSCC_RC_SUCCESS) {

+                       TRACE_4("ERR_LIBRARY: Lock failed for control block "

+                                       "write");

+                       rc = SA_AIS_ERR_LIBRARY;

+                       break;

+               }

+

+               locked = true;

+

+               /* Check if queueHandle is present in the tree */

+               if ((queue_node = mqa_queue_tree_find_and_add(

+                       mqa_cb, queueHandle, false, NULL, 0)) == NULL) {

+                       TRACE_2("ERR_BAD_HANDLE: Queue Database Find Failed");

+                       rc = SA_AIS_ERR_BAD_HANDLE;

+                       break;

+               }

+

+               if (queue_node->client_info->version.majorVersion <

+                       MQA_MAJOR_VERSION) {

+                       TRACE_2("ERR_VERSION: client not B.03.01");

+                       rc = SA_AIS_ERR_VERSION;

+                       break;

+               }

+

+               if (queue_node->client_info->version.majorVersion ==

+                       MQA_MAJOR_VERSION) {

+                       if (!mqa_cb->clm_node_joined ||

+                               queue_node->client_info->isStale) {

+                               TRACE_2("ERR_UNAVAILABLE: node is not cluster "

+                                       "member");

+                               rc = SA_AIS_ERR_UNAVAILABLE;

+                               break;

+                       }

+               }

+

+               if (!thresholds) {

+                       TRACE_2("ERR_INVALID_PARAM: thresholds is 0");

+                       rc = SA_AIS_ERR_INVALID_PARAM;

+                       break;

+               }

+

+               for (i = SA_MSG_MESSAGE_HIGHEST_PRIORITY;

+                               i <= SA_MSG_MESSAGE_LOWEST_PRIORITY;

+                               i++) {

+                       if (thresholds->capacityAvailable[i] >

+                                       thresholds->capacityReached[i]) {

+                               TRACE_2("ERR_INVALID_PARAM: Available greater "

+                                               "than Reached");

+                               rc = SA_AIS_ERR_INVALID_PARAM;

+                               break;

+                       }

+               }

+

+               if (rc != SA_AIS_OK)

+                       break;

+

+               /* Check if mqnd is up */

+               if (!mqa_cb->is_mqnd_up) {

+                       TRACE_2("ERR_TRY_AGAIN: MQND is down");

+                       rc = SA_AIS_ERR_TRY_AGAIN;

+                       break;

+               }

+

+               /* populate the structure */

+               memset(&cap_evt, 0, sizeof(MQSV_EVT));

+               cap_evt.type = MQSV_EVT_MQP_REQ;

+               cap_evt.msg.mqp_req.type = MQP_EVT_CAP_SET_REQ;

+               cap_evt.msg.mqp_req.info.capacity.queueHandle = queueHandle;;

+               cap_evt.msg.mqp_req.info.capacity.thresholds = *thresholds;

+               cap_evt.msg.mqp_req.agent_mds_dest = mqa_cb->mqa_mds_dest;

+

+               /* send the request to the MQND */

+               mds_rc = mqa_mds_msg_sync_send(mqa_cb->mqa_mds_hdl,

+                                       &mqa_cb->mqnd_mds_dest,

+                                       &cap_evt,

+                                       &out_evt,

+                                       MQSV_WAIT_TIME);

+

+               switch (mds_rc) {

+                       case NCSCC_RC_SUCCESS:

+                               break;

+

+                       case NCSCC_RC_REQ_TIMOUT:

+                               TRACE_2("ERR_TIMEOUT: Message Send through MDS "

+                                               "Timeout %" PRIx64,

+                                       mqa_cb->mqa_mds_dest);

+                               rc = SA_AIS_ERR_TIMEOUT;

+                               break;

+

+                       case NCSCC_RC_FAILURE:

+                               TRACE_2("ERR_TRY_AGAIN: Message Send through "

+                                               "MDS Failure %" PRIx64,

+                                       mqa_cb->mqa_mds_dest);

+                               rc = SA_AIS_ERR_TRY_AGAIN;

+                               break;

+

+                       default:

+                               TRACE_4("ERR_RESOURCES: Message Send through "

+                                               "MDS Failure %" PRIx64,

+                                       mqa_cb->mqa_mds_dest);

+                               rc = SA_AIS_ERR_NO_RESOURCES;

+                               break;

+               }

+

+               if (mds_rc != NCSCC_RC_SUCCESS)

+                       break;

+

+               if (out_evt) {

+                       rc = out_evt->msg.mqp_rsp.error;

+                       m_MMGR_FREE_MQA_EVT(out_evt);

+               } else {

+                       TRACE_4("ERR_RESOURCES: Response not received from "

+                                       "MQND");

+                       rc = SA_AIS_ERR_NO_RESOURCES;

+               }

+       } while (false);

+

+       if (locked)

+               m_NCS_UNLOCK(&mqa_cb->cb_lock, NCS_LOCK_WRITE);

+

+       if (mqa_cb)

+               m_MQSV_MQA_GIVEUP_MQA_CB;

+

+       if (rc == SA_AIS_OK) {

+               TRACE_LEAVE2(" Success ");

+       } else {

+               TRACE_LEAVE2(" Failed with return code - %d ", rc);

+       }

+

+       return rc;

+}

+

+/****************************************************************************

+  Name          : saMsgQueueCapacityThresholdsGet

+

+  Description   : This routine gets the critical capacity thresholds for the

+                 queue.

+

+  Arguments     : SaMsgQueueHandleT queueHandle

+               : SaMsgQueueThresholdsT *thresholds

+

+  Return Values : SaAisErrorT

+

+  Notes         : None

+******************************************************************************/

+SaAisErrorT saMsgQueueCapacityThresholdsGet(SaMsgQueueHandleT queueHandle,

+               SaMsgQueueThresholdsT *thresholds)

+{

+       SaAisErrorT rc = SA_AIS_OK;

+       MQA_CB *mqa_cb;

+       bool locked = false;

+

+       do {

+               MQSV_EVT cap_evt;

+               MQSV_EVT *out_evt = 0;

+               MQA_QUEUE_INFO *queue_node;

+               uint8_t mds_rc;

+

+               TRACE_ENTER2("SaMsgQueueHandle %llu ", queueHandle);

+

+               /* retrieve MQA CB */

+               mqa_cb = (MQA_CB *)m_MQSV_MQA_RETRIEVE_MQA_CB;

+               if (!mqa_cb) {

+                       TRACE_2("ERR_BAD_HANDLE: Control block retrieval "

+                                       "failed");

+                       rc = SA_AIS_ERR_BAD_HANDLE;

+                       break;

+               }

+

+               if (m_NCS_LOCK(&mqa_cb->cb_lock, NCS_LOCK_WRITE) !=

+                               NCSCC_RC_SUCCESS) {

+                       TRACE_4("ERR_LIBRARY: Lock failed for control block "

+                                       "write");

+                       rc = SA_AIS_ERR_LIBRARY;

+                       break;

+               }

+

+               locked = true;

+

+               /* Check if queueHandle is present in the tree */

+               if ((queue_node = mqa_queue_tree_find_and_add(

+                       mqa_cb, queueHandle, false, NULL, 0)) == NULL) {

+                       TRACE_2("ERR_BAD_HANDLE: Queue Database Find Failed");

+                       rc = SA_AIS_ERR_BAD_HANDLE;

+                       break;

+               }

+

+               if (queue_node->client_info->version.majorVersion <

+                       MQA_MAJOR_VERSION) {

+                       TRACE_2("ERR_VERSION: client not B.03.01");

+                       rc = SA_AIS_ERR_VERSION;

+                       break;

+               }

+

+               if (queue_node->client_info->version.majorVersion ==

+                       MQA_MAJOR_VERSION) {

+                       if (!mqa_cb->clm_node_joined ||

+                               queue_node->client_info->isStale) {

+                               TRACE_2("ERR_UNAVAILABLE: node is not cluster "

+                                       "member");

+                               rc = SA_AIS_ERR_UNAVAILABLE;

+                               break;

+                       }

+               }

+

+               if (!thresholds) {

+                       TRACE_2("ERR_INVALID_PARAM: thresholds is 0");

+                       rc = SA_AIS_ERR_INVALID_PARAM;

+                       break;

+               }

+

+               /* Check if mqnd is up */

+               if (!mqa_cb->is_mqnd_up) {

+                       TRACE_2("ERR_TRY_AGAIN: MQD or MQND is down");

+                       rc = SA_AIS_ERR_TRY_AGAIN;

+                       break;

+               }

+

+               /* populate the structure */

+               memset(&cap_evt, 0, sizeof(MQSV_EVT));

+               cap_evt.type = MQSV_EVT_MQP_REQ;

+               cap_evt.msg.mqp_req.type = MQP_EVT_CAP_GET_REQ;

+               cap_evt.msg.mqp_req.info.capacity.queueHandle = queueHandle;;

+               cap_evt.msg.mqp_req.agent_mds_dest = mqa_cb->mqa_mds_dest;

+

+               /* send the request to the MQND */

+               mds_rc = mqa_mds_msg_sync_send(mqa_cb->mqa_mds_hdl,

+                                       &mqa_cb->mqnd_mds_dest,

+                                       &cap_evt,

+                                       &out_evt,

+                                       MQSV_WAIT_TIME);

+

+               switch (mds_rc) {

+                       case NCSCC_RC_SUCCESS:

+                               break;

+

+                       case NCSCC_RC_REQ_TIMOUT:

+                               TRACE_2("ERR_TIMEOUT: Message Send through MDS "

+                                               "Timeout %" PRIx64,

+                                       mqa_cb->mqa_mds_dest);

+                               rc = SA_AIS_ERR_TIMEOUT;

+                               break;

+

+                       case NCSCC_RC_FAILURE:

+                               TRACE_2("ERR_TRY_AGAIN: Message Send through "

+                                               "MDS Failure %" PRIx64,

+                                       mqa_cb->mqa_mds_dest);

+                               rc = SA_AIS_ERR_TRY_AGAIN;

+                               break;

+

+                       default:

+                               TRACE_4("ERR_RESOURCES: Message Send through "

+                                               "MDS Failure %" PRIx64,

+                                       mqa_cb->mqa_mds_dest);

+                               rc = SA_AIS_ERR_NO_RESOURCES;

+                               break;

+               }

+

+               if (mds_rc != NCSCC_RC_SUCCESS)

+                       break;

+

+               if (out_evt) {

+                       rc = out_evt->msg.mqp_rsp.error;

+

+                       if (rc == SA_AIS_OK) {

+                               *thresholds = out_evt->msg.mqp_rsp.info.

+                                       capacity.thresholds;

+                       }

+

+                       m_MMGR_FREE_MQA_EVT(out_evt);

+               } else {

+                       TRACE_4("ERR_RESOURCES: Response not received from "

+                                       "MQND");

+                       rc = SA_AIS_ERR_NO_RESOURCES;

+               }

+       } while (false);

+

+       if (locked)

+               m_NCS_UNLOCK(&mqa_cb->cb_lock, NCS_LOCK_WRITE);

+

+       if (mqa_cb)

+               m_MQSV_MQA_GIVEUP_MQA_CB;

+

+       if (rc == SA_AIS_OK) {

+               TRACE_LEAVE2(" Success ");

+       } else {

+               TRACE_LEAVE2(" Failed with return code - %d ", rc);

+       }

+

+       return rc;

+}

 /*****************************************************************************

   Name          : msgget_timer_expired



diff --git a/src/msg/apitest/test_CapacityThresholds.cc 
b/src/msg/apitest/test_CapacityThresholds.cc

new file mode 100644

index 0000000..38d2144

--- /dev/null

+++ b/src/msg/apitest/test_CapacityThresholds.cc

@@ -0,0 +1,1049 @@

+#include <cassert>

+#include <condition_variable>

+#include <cstdio>

+#include <cstring>

+#include <iostream>

+#include <mutex>

+#include <queue>

+#include <thread>

+#include <sys/poll.h>

+#include "msg/apitest/msgtest.h"

+#include "msg/apitest/tet_mqsv.h"

+#include <saMsg.h>

+#include <saNtf.h>

+

+static SaVersionT msg3_1 = {'B', 3, 0};

+

+typedef std::queue<SaMsgMessageCapacityStatusT> TestQueue;

+typedef std::queue<int> ResultQueue;

+static TestQueue testQueue;

+static ResultQueue resultQueue;

+static std::mutex m;

+static std::condition_variable cv;

+static bool ready(false);

+static bool pollDone(false);

+

+static const SaNameT queueName = {

+  sizeof("safMq=TestQueue") - 1,

+  "safMq=TestQueue"

+};

+

+static const SaNameT queueGroupName = {

+  sizeof("safMqg=TestQueueGroup") - 1,

+  "safMqg=TestQueueGroup"

+};

+

+static const SaMsgQueueCreationAttributesT creationAttributes = {

+  0,

+  { 5, 5, 5, 5 },

+  SA_TIME_ONE_SECOND

+};

+

+static SaMsgQueueHandleT openQueue(SaMsgHandleT msgHandle) {

+  SaMsgQueueHandleT queueHandle;

+

+  SaAisErrorT rc(saMsgQueueOpen(msgHandle,

+                                &queueName,

+                                &creationAttributes,

+                                SA_MSG_QUEUE_CREATE,

+                                SA_TIME_MAX,

+                                &queueHandle));

+  assert(rc == SA_AIS_OK);

+

+  return queueHandle;

+}

+

+

+static void ntfCallback(SaNtfSubscriptionIdT subscriptionId,

+                        const SaNtfNotificationsT *n) {

+  int asyncTestResult(TET_FAIL);

+  int asyncTest(testQueue.front());

+  testQueue.pop();

+

+  do {

+    if (n->notificationType != SA_NTF_TYPE_STATE_CHANGE) {

+      break;

+    }

+

+    if 
(n->notification.stateChangeNotification.notificationHeader.notificationClassId->vendorId
 != SA_NTF_VENDOR_ID_SAF) {

+      break;

+    }

+

+    if 
(n->notification.stateChangeNotification.notificationHeader.notificationClassId->majorId
 != SA_SVC_MSG) {

+      break;

+    }

+

+    if (n->notification.stateChangeNotification.numStateChanges != 1) {

+      break;

+    }

+

+    if (*n->notification.stateChangeNotification.sourceIndicator != 
SA_NTF_OBJECT_OPERATION) {

+      break;

+    }

+

+    if (n->notification.stateChangeNotification.changedStates[0].stateId != 
SA_MSG_DEST_CAPACITY_STATUS) {

+      break;

+    }

+

+    if 
(n->notification.stateChangeNotification.notificationHeader.notifyingObject->length
 != sizeof("safApp=safMsgService") - 1) {

+      break;

+    }

+

+    if 
(memcmp(n->notification.stateChangeNotification.notificationHeader.notifyingObject->value,

+               "safApp=safMsgService",

+               
n->notification.stateChangeNotification.notificationHeader.notifyingObject->length))
 {

+      break;

+    }

+

+    if (asyncTest == SA_MSG_QUEUE_CAPACITY_REACHED) {

+      if 
(n->notification.stateChangeNotification.notificationHeader.notificationClassId->minorId
 != 0x65) {

+        break;

+      }

+

+      if (n->notification.stateChangeNotification.changedStates[0].newState != 
SA_MSG_QUEUE_CAPACITY_REACHED) {

+        break;

+      }

+

+      if 
(n->notification.stateChangeNotification.changedStates[0].oldStatePresent != 
SA_FALSE) {

+        break;

+      }

+

+      asyncTestResult = TET_PASS;

+    } else if (asyncTest == SA_MSG_QUEUE_CAPACITY_AVAILABLE) {

+      if 
(n->notification.stateChangeNotification.notificationHeader.notificationClassId->minorId
 != 0x66) {

+        break;

+      }

+

+      if (n->notification.stateChangeNotification.changedStates[0].newState != 
SA_MSG_QUEUE_CAPACITY_AVAILABLE) {

+        break;

+      }

+

+      if 
(n->notification.stateChangeNotification.changedStates[0].oldStatePresent != 
SA_TRUE) {

+        break;

+      }

+

+      if (n->notification.stateChangeNotification.changedStates[0].oldState != 
SA_MSG_QUEUE_CAPACITY_REACHED) {

+        break;

+      }

+

+      asyncTestResult = TET_PASS;

+    } else if (asyncTest == SA_MSG_QUEUE_GROUP_CAPACITY_REACHED) {

+      if 
(n->notification.stateChangeNotification.notificationHeader.notificationClassId->minorId
 != 0x67) {

+        break;

+      }

+

+      if (n->notification.stateChangeNotification.changedStates[0].newState != 
SA_MSG_QUEUE_GROUP_CAPACITY_REACHED) {

+        break;

+      }

+

+      if 
(n->notification.stateChangeNotification.changedStates[0].oldStatePresent != 
SA_FALSE) {

+        break;

+      }

+

+      asyncTestResult = TET_PASS;

+    } else if (asyncTest == SA_MSG_QUEUE_GROUP_CAPACITY_AVAILABLE) {

+      if 
(n->notification.stateChangeNotification.notificationHeader.notificationClassId->minorId
 != 0x68) {

+        break;

+      }

+

+      if (n->notification.stateChangeNotification.changedStates[0].newState != 
SA_MSG_QUEUE_GROUP_CAPACITY_AVAILABLE) {

+        break;

+      }

+

+      if 
(n->notification.stateChangeNotification.changedStates[0].oldStatePresent != 
SA_TRUE) {

+        break;

+      }

+

+      if (n->notification.stateChangeNotification.changedStates[0].oldState != 
SA_MSG_QUEUE_GROUP_CAPACITY_REACHED) {

+        break;

+      }

+

+      asyncTestResult = TET_PASS;

+    } else {

+      assert(false);

+      break;

+    }

+  } while (false);

+

+  resultQueue.push(asyncTestResult);

+

+  if (testQueue.empty()) {

+    {

+      std::lock_guard<std::mutex> lk(m);

+      ready = true;

+    }

+

+    cv.notify_one();

+  }

+}

+

+static void testNtfStateChange(SaNtfHandleT ntfHandle,

+                               SaSelectionObjectT ntfSelObj) {

+  pollfd fd = { static_cast<int>(ntfSelObj), POLLIN };

+

+  do {

+    if (pollDone)

+      break;

+

+    if (poll(&fd, 1, 1000) < 0) {

+      m_TET_MQSV_PRINTF("poll FAILED: %i\n", errno);

+      break;

+    }

+

+    if (fd.revents & POLLIN) {

+      SaAisErrorT rc(saNtfDispatch(ntfHandle, SA_DISPATCH_ONE));

+

+      if (rc != SA_AIS_OK) {

+        if (rc != SA_AIS_ERR_BAD_HANDLE)

+          std::cerr << "saNtfDispatch failed: " << rc << std::endl;

+        break;

+      }

+    }

+  } while (true);

+}

+

+static int testNtfInit(SaNtfHandleT& ntfHandle, SaSelectionObjectT& ntfSelObj) 
{

+  int result(TET_PASS);

+

+  do {

+    SaNtfCallbacksT callbacks = { ntfCallback, 0 };

+    SaVersionT version = { 'A', 1, 0 };

+    SaNtfStateChangeNotificationFilterT stateChangeFilter;

+

+    SaAisErrorT rc(saNtfInitialize(&ntfHandle, &callbacks, &version));

+    if (rc != SA_AIS_OK) {

+      result = TET_FAIL;

+      break;

+    }

+

+    rc = saNtfStateChangeNotificationFilterAllocate(

+      ntfHandle, &stateChangeFilter, 0, 0, 0, 0, 0, 0);

+

+    if (rc != SA_AIS_OK) {

+      result = TET_FAIL;

+      break;

+    }

+

+    SaNtfNotificationTypeFilterHandlesT notificationFilterHandles = {

+      0, 0, stateChangeFilter.notificationFilterHandle, 0, 0

+    };

+

+    rc = saNtfNotificationSubscribe(&notificationFilterHandles, 1);

+

+    if (rc != SA_AIS_OK) {

+      result = TET_FAIL;

+      break;

+    }

+

+    rc = 
saNtfNotificationFilterFree(stateChangeFilter.notificationFilterHandle);

+    if (rc != SA_AIS_OK) {

+      result = TET_FAIL;

+      break;

+    }

+

+    rc = saNtfSelectionObjectGet(ntfHandle, &ntfSelObj);

+    if (rc != SA_AIS_OK) {

+      result = TET_FAIL;

+      break;

+    }

+

+    pollDone = false;

+  } while (false);

+

+  return result;

+}

+

+static int testNtfCleanup(SaNtfHandleT ntfHandle) {

+  int result(TET_PASS);

+  SaAisErrorT rc(saNtfFinalize(ntfHandle));

+  if (rc != SA_AIS_OK)

+    result = TET_FAIL;

+

+  pollDone = true;

+

+  return result;

+}

+

+static void capacityThresholds_01(void) {

+  SaMsgQueueThresholdsT thresholds;

+  SaAisErrorT rc(saMsgQueueCapacityThresholdsSet(0xdeadbeef, &thresholds));

+  aisrc_validate(rc, SA_AIS_ERR_BAD_HANDLE);

+}

+

+static void capacityThresholds_02(void) {

+  SaMsgHandleT msgHandle;

+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);

+  assert(rc == SA_AIS_OK);

+

+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));

+

+  rc = saMsgFinalize(msgHandle);

+  assert(rc == SA_AIS_OK);

+

+  SaMsgQueueThresholdsT thresholds;

+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);

+  aisrc_validate(rc, SA_AIS_ERR_BAD_HANDLE);

+}

+

+static void capacityThresholds_03(void) {

+  SaMsgHandleT msgHandle;

+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);

+  assert(rc == SA_AIS_OK);

+

+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));

+

+  rc = saMsgQueueClose(queueHandle);

+  assert(rc == SA_AIS_OK);

+

+  SaMsgQueueThresholdsT thresholds;

+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);

+  aisrc_validate(rc, SA_AIS_ERR_BAD_HANDLE);

+

+  rc = saMsgFinalize(msgHandle);

+  assert(rc == SA_AIS_OK);

+}

+

+static void capacityThresholds_04(void) {

+  SaMsgHandleT msgHandle;

+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);

+  assert(rc == SA_AIS_OK);

+

+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));

+

+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, 0);

+  aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);

+

+  rc = saMsgFinalize(msgHandle);

+  assert(rc == SA_AIS_OK);

+}

+

+static void capacityThresholds_05(void) {

+  SaMsgHandleT msgHandle;

+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);

+  assert(rc == SA_AIS_OK);

+

+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));

+

+  const SaMsgQueueThresholdsT thresholds = {

+    { 4, 5, 5, 5 }, { 5, 5, 5, 5 }

+  };

+

+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);

+  aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);

+

+  rc = saMsgFinalize(msgHandle);

+  assert(rc == SA_AIS_OK);

+}

+

+static void capacityThresholds_06(void) {

+  SaMsgHandleT msgHandle;

+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);

+  assert(rc == SA_AIS_OK);

+

+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));

+

+  const SaMsgQueueThresholdsT thresholds = {

+    { 5, 4, 5, 5 }, { 5, 5, 5, 5 }

+  };

+

+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);

+  aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);

+

+  rc = saMsgFinalize(msgHandle);

+  assert(rc == SA_AIS_OK);

+}

+

+static void capacityThresholds_07(void) {

+  SaMsgHandleT msgHandle;

+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);

+  assert(rc == SA_AIS_OK);

+

+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));

+

+  const SaMsgQueueThresholdsT thresholds = {

+    { 5, 5, 4, 5 }, { 5, 5, 5, 5 }

+  };

+

+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);

+  aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);

+

+  rc = saMsgFinalize(msgHandle);

+  assert(rc == SA_AIS_OK);

+}

+

+static void capacityThresholds_08(void) {

+  SaMsgHandleT msgHandle;

+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);

+  assert(rc == SA_AIS_OK);

+

+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));

+

+  const SaMsgQueueThresholdsT thresholds = {

+    { 5, 5, 5, 4 }, { 5, 5, 5, 5 }

+  };

+

+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);

+  aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);

+

+  rc = saMsgFinalize(msgHandle);

+  assert(rc == SA_AIS_OK);

+}

+

+static void capacityThresholds_09(void) {

+  SaMsgHandleT msgHandle;

+  SaVersionT msg1_1 = {'B', 1, 0};

+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg1_1);

+  assert(rc == SA_AIS_OK);

+

+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));

+

+  const SaMsgQueueThresholdsT thresholds = {

+    { 5, 5, 5, 5 }, { 5, 5, 5, 5 }

+  };

+

+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);

+  aisrc_validate(rc, SA_AIS_ERR_VERSION);

+

+  rc = saMsgFinalize(msgHandle);

+  assert(rc == SA_AIS_OK);

+}

+

+static void capacityThresholds_10(void) {

+  SaMsgHandleT msgHandle;

+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);

+  assert(rc == SA_AIS_OK);

+

+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));

+

+  const SaMsgQueueThresholdsT thresholds = {

+    { 6, 5, 5, 5 }, { 5, 5, 5, 5 }

+  };

+

+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);

+  aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);

+

+  rc = saMsgFinalize(msgHandle);

+  assert(rc == SA_AIS_OK);

+}

+

+static void capacityThresholds_11(void) {

+  SaMsgHandleT msgHandle;

+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);

+  assert(rc == SA_AIS_OK);

+

+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));

+

+  const SaMsgQueueThresholdsT thresholds = {

+    { 5, 6, 5, 5 }, { 5, 5, 5, 5 }

+  };

+

+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);

+  aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);

+

+  rc = saMsgFinalize(msgHandle);

+  assert(rc == SA_AIS_OK);

+}

+

+static void capacityThresholds_12(void) {

+  SaMsgHandleT msgHandle;

+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);

+  assert(rc == SA_AIS_OK);

+

+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));

+

+  const SaMsgQueueThresholdsT thresholds = {

+    { 5, 5, 6, 5 }, { 5, 5, 5, 5 }

+  };

+

+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);

+  aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);

+

+  rc = saMsgFinalize(msgHandle);

+  assert(rc == SA_AIS_OK);

+}

+

+static void capacityThresholds_13(void) {

+  SaMsgHandleT msgHandle;

+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);

+  assert(rc == SA_AIS_OK);

+

+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));

+

+  const SaMsgQueueThresholdsT thresholds = {

+    { 5, 5, 5, 6 }, { 5, 5, 5, 5 }

+  };

+

+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);

+  aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);

+

+  rc = saMsgFinalize(msgHandle);

+  assert(rc == SA_AIS_OK);

+}

+

+static void capacityThresholds_14(void) {

+  SaMsgHandleT msgHandle;

+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);

+  assert(rc == SA_AIS_OK);

+

+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));

+

+  const SaMsgQueueThresholdsT thresholds = {

+    { 5, 5, 5, 5 }, { 5, 5, 5, 5 }

+  };

+

+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);

+  aisrc_validate(rc, SA_AIS_OK);

+

+  rc = saMsgFinalize(msgHandle);

+  assert(rc == SA_AIS_OK);

+}

+

+static void capacityThresholds_15(void) {

+  SaMsgQueueThresholdsT thresholds;

+  SaAisErrorT rc(saMsgQueueCapacityThresholdsGet(0xdeadbeef, &thresholds));

+  aisrc_validate(rc, SA_AIS_ERR_BAD_HANDLE);

+}

+

+static void capacityThresholds_16(void) {

+  SaMsgHandleT msgHandle;

+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);

+  assert(rc == SA_AIS_OK);

+

+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));

+

+  rc = saMsgFinalize(msgHandle);

+  assert(rc == SA_AIS_OK);

+

+  SaMsgQueueThresholdsT thresholds;

+  rc = saMsgQueueCapacityThresholdsGet(queueHandle, &thresholds);

+  aisrc_validate(rc, SA_AIS_ERR_BAD_HANDLE);

+}

+

+static void capacityThresholds_17(void) {

+  SaMsgHandleT msgHandle;

+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);

+  assert(rc == SA_AIS_OK);

+

+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));

+

+  rc = saMsgQueueClose(queueHandle);

+  assert(rc == SA_AIS_OK);

+

+  SaMsgQueueThresholdsT thresholds;

+  rc = saMsgQueueCapacityThresholdsGet(queueHandle, &thresholds);

+  aisrc_validate(rc, SA_AIS_ERR_BAD_HANDLE);

+

+  rc = saMsgFinalize(msgHandle);

+  assert(rc == SA_AIS_OK);

+}

+

+static void capacityThresholds_18(void) {

+  SaMsgHandleT msgHandle;

+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);

+  assert(rc == SA_AIS_OK);

+

+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));

+

+  rc = saMsgQueueCapacityThresholdsGet(queueHandle, 0);

+  aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);

+

+  rc = saMsgFinalize(msgHandle);

+  assert(rc == SA_AIS_OK);

+}

+

+static void capacityThresholds_19(void) {

+  SaMsgHandleT msgHandle;

+  SaVersionT msg1_1 = {'B', 1, 0};

+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg1_1);

+  assert(rc == SA_AIS_OK);

+

+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));

+

+  SaMsgQueueThresholdsT thresholds;

+  rc = saMsgQueueCapacityThresholdsGet(queueHandle, &thresholds);

+  aisrc_validate(rc, SA_AIS_ERR_VERSION);

+

+  rc = saMsgFinalize(msgHandle);

+  assert(rc == SA_AIS_OK);

+}

+

+static void capacityThresholds_20(void) {

+  SaMsgHandleT msgHandle;

+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);

+  assert(rc == SA_AIS_OK);

+

+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));

+

+  const SaMsgQueueThresholdsT thresholds = {

+    { 1, 2, 3, 4 }, { 1, 2, 3, 4 }

+  };

+

+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);

+  assert(rc == SA_AIS_OK);

+

+  SaMsgQueueThresholdsT thresholdsGet;

+  rc = saMsgQueueCapacityThresholdsGet(queueHandle, &thresholdsGet);

+  aisrc_validate(rc, SA_AIS_OK);

+

+  assert(!memcmp(&thresholds, &thresholdsGet, sizeof(SaMsgQueueThresholdsT)));

+

+  rc = saMsgFinalize(msgHandle);

+  assert(rc == SA_AIS_OK);

+}

+

+static void capacityThresholds_21(void) {

+  int result(TET_PASS);

+

+  do {

+    SaMsgHandleT msgHandle;

+    SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);

+    assert(rc == SA_AIS_OK);

+

+    SaMsgQueueHandleT queueHandle(openQueue(msgHandle));

+

+    const SaMsgQueueThresholdsT thresholds = {

+      { 3, 3, 3, 3 }, { 2, 2, 2, 2 }

+    };

+

+    rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);

+    assert(rc == SA_AIS_OK);

+

+    SaNtfHandleT ntfHandle;

+    SaSelectionObjectT ntfSelObj;

+    result = testNtfInit(ntfHandle, ntfSelObj);

+    if (result != TET_PASS)

+      break;

+

+    ready = false;

+    std::thread t(&testNtfStateChange, ntfHandle, ntfSelObj);

+    testQueue.push(SA_MSG_QUEUE_CAPACITY_REACHED);

+

+    /* fill up the q */

+    for (SaUint8T i(SA_MSG_MESSAGE_HIGHEST_PRIORITY);

+         i <= SA_MSG_MESSAGE_LOWEST_PRIORITY;

+         i++) {

+      char data[] = "abcd";

+

+      SaMsgMessageT message = {

+        1, 1, sizeof(data) - 1, 0, data, i

+      };

+

+      rc = saMsgMessageSend(msgHandle, &queueName, &message, 
SA_TIME_ONE_SECOND);

+      assert(rc == SA_AIS_OK);

+    }

+

+    // wait for the notifications

+    {

+      std::unique_lock<std::mutex> lk(m);

+      cv.wait(lk, []{ return ready; });

+    }

+

+    result = resultQueue.front();

+    resultQueue.pop();

+    if (result != TET_PASS)

+      break;

+

+    result = testNtfCleanup(ntfHandle);

+    if (result != TET_PASS)

+      break;

+

+    rc = saMsgFinalize(msgHandle);

+    assert(rc == SA_AIS_OK);

+

+    t.join();

+  } while (false);

+

+  test_validate(result, TET_PASS);

+

+  // let the queue close

+  sleep(2);

+}

+

+static void capacityThresholds_22(void) {

+  int result(TET_PASS);

+

+  do {

+    SaMsgHandleT msgHandle;

+    SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);

+    assert(rc == SA_AIS_OK);

+

+    SaMsgQueueHandleT queueHandle(openQueue(msgHandle));

+

+    const SaMsgQueueThresholdsT thresholds = {

+      { 3, 3, 3, 3 }, { 2, 2, 2, 2 }

+    };

+

+    rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);

+    assert(rc == SA_AIS_OK);

+

+    SaNtfHandleT ntfHandle;

+    SaSelectionObjectT ntfSelObj;

+    result = testNtfInit(ntfHandle, ntfSelObj);

+    if (result != TET_PASS)

+      break;

+

+    ready = false;

+    std::thread t(&testNtfStateChange, ntfHandle, ntfSelObj);

+    testQueue.push(SA_MSG_QUEUE_CAPACITY_REACHED);

+

+    /* fill up the q */

+    for (SaUint8T i(SA_MSG_MESSAGE_HIGHEST_PRIORITY);

+         i <= SA_MSG_MESSAGE_LOWEST_PRIORITY;

+         i++) {

+      char data[] = "abcd";

+

+      SaMsgMessageT message = {

+        1, 1, sizeof(data) - 1, 0, data, i

+      };

+

+      rc = saMsgMessageSend(msgHandle, &queueName, &message, 
SA_TIME_ONE_SECOND);

+      assert(rc == SA_AIS_OK);

+    }

+

+    // wait for the notifications

+    {

+      std::unique_lock<std::mutex> lk(m);

+      cv.wait(lk, []{ return ready; });

+    }

+

+    result = resultQueue.front();

+    resultQueue.pop();

+    if (result != TET_PASS)

+      break;

+

+    ready = false;

+

+    testQueue.push(SA_MSG_QUEUE_CAPACITY_AVAILABLE);

+

+    /* read a message */

+    char data[4];

+    SaMsgMessageT msg;

+    SaMsgSenderIdT senderId;

+    memset(&msg, 0, sizeof(msg));

+    msg.data = &data;

+    msg.size = sizeof(data);

+

+    rc = saMsgMessageGet(queueHandle, &msg, 0, &senderId, SA_TIME_ONE_SECOND);

+    assert(rc == SA_AIS_OK);

+

+    // wait for the notifications

+    {

+      std::unique_lock<std::mutex> lk(m);

+      cv.wait(lk, []{ return ready; });

+    }

+

+    result = resultQueue.front();

+    resultQueue.pop();

+    if (result != TET_PASS)

+      break;

+

+    result = testNtfCleanup(ntfHandle);

+    if (result != TET_PASS)

+      break;

+

+    rc = saMsgFinalize(msgHandle);

+    assert(rc == SA_AIS_OK);

+

+    t.join();

+  } while (false);

+

+  test_validate(result, TET_PASS);

+

+  // let the queue close

+  sleep(2);

+}

+

+static void capacityThresholds_23(void) {

+  int result(TET_PASS);

+

+  do {

+    SaMsgHandleT msgHandle;

+    SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);

+    assert(rc == SA_AIS_OK);

+

+    SaMsgQueueHandleT queueHandle(openQueue(msgHandle));

+

+    const SaMsgQueueThresholdsT thresholds = {

+      { 3, 3, 3, 3 }, { 2, 2, 2, 2 }

+    };

+

+    rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);

+    assert(rc == SA_AIS_OK);

+

+    rc = saMsgQueueGroupCreate(msgHandle,

+                               &queueGroupName,

+                               SA_MSG_QUEUE_GROUP_ROUND_ROBIN);

+    assert(rc == SA_AIS_OK);

+

+    rc = saMsgQueueGroupInsert(msgHandle, &queueGroupName, &queueName);

+    assert(rc == SA_AIS_OK);

+

+    SaNtfHandleT ntfHandle;

+    SaSelectionObjectT ntfSelObj;

+    result = testNtfInit(ntfHandle, ntfSelObj);

+    if (result != TET_PASS)

+      break;

+

+    ready = false;

+    std::thread t(&testNtfStateChange, ntfHandle, ntfSelObj);

+    testQueue.push(SA_MSG_QUEUE_CAPACITY_REACHED);

+    testQueue.push(SA_MSG_QUEUE_GROUP_CAPACITY_REACHED);

+

+    /* fill up the q */

+    for (SaUint8T i(SA_MSG_MESSAGE_HIGHEST_PRIORITY);

+         i <= SA_MSG_MESSAGE_LOWEST_PRIORITY;

+         i++) {

+      char data[] = "abcd";

+

+      SaMsgMessageT message = {

+        1, 1, sizeof(data) - 1, 0, data, i

+      };

+

+      rc = saMsgMessageSend(msgHandle,

+                            &queueGroupName,

+                            &message,

+                            SA_TIME_ONE_SECOND);

+      assert(rc == SA_AIS_OK);

+    }

+

+    // wait for the notifications

+    {

+      std::unique_lock<std::mutex> lk(m);

+      cv.wait(lk, []{ return ready; });

+    }

+

+    ready = false;

+

+    result = resultQueue.front();

+    resultQueue.pop();

+    if (result != TET_PASS)

+      break;

+

+    result = resultQueue.front();

+    resultQueue.pop();

+    if (result != TET_PASS)

+      break;

+

+    result = testNtfCleanup(ntfHandle);

+    if (result != TET_PASS) {

+      break;

+    }

+

+    rc = saMsgQueueGroupDelete(msgHandle, &queueGroupName);

+    assert(rc == SA_AIS_OK);

+

+    rc = saMsgFinalize(msgHandle);

+    assert(rc == SA_AIS_OK);

+

+    t.join();

+  } while (false);

+

+  test_validate(result, TET_PASS);

+

+  // let the queue close

+  sleep(2);

+}

+

+static void capacityThresholds_24(void) {

+  int result(TET_PASS);

+

+  do {

+    SaMsgHandleT msgHandle;

+    SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);

+    assert(rc == SA_AIS_OK);

+

+    SaMsgQueueHandleT queueHandle(openQueue(msgHandle));

+

+    const SaMsgQueueThresholdsT thresholds = {

+      { 3, 3, 3, 3 }, { 2, 2, 2, 2 }

+    };

+

+    rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);

+    assert(rc == SA_AIS_OK);

+

+    rc = saMsgQueueGroupCreate(msgHandle,

+                               &queueGroupName,

+                               SA_MSG_QUEUE_GROUP_ROUND_ROBIN);

+    assert(rc == SA_AIS_OK);

+

+    rc = saMsgQueueGroupInsert(msgHandle, &queueGroupName, &queueName);

+    assert(rc == SA_AIS_OK);

+

+    SaNtfHandleT ntfHandle;

+    SaSelectionObjectT ntfSelObj;

+    result = testNtfInit(ntfHandle, ntfSelObj);

+    if (result != TET_PASS) {

+      break;

+    }

+

+    ready = false;

+    std::thread t(&testNtfStateChange, ntfHandle, ntfSelObj);

+    testQueue.push(SA_MSG_QUEUE_CAPACITY_REACHED);

+    testQueue.push(SA_MSG_QUEUE_GROUP_CAPACITY_REACHED);

+

+    /* fill up the q */

+    for (SaUint8T i(SA_MSG_MESSAGE_HIGHEST_PRIORITY);

+         i <= SA_MSG_MESSAGE_LOWEST_PRIORITY;

+         i++) {

+      char data[] = "abcd";

+

+      SaMsgMessageT message = {

+        1, 1, sizeof(data) - 1, 0, data, i

+      };

+

+      rc = saMsgMessageSend(msgHandle, &queueName, &message, 
SA_TIME_ONE_SECOND);

+      assert(rc == SA_AIS_OK);

+    }

+

+    // wait for the notifications

+    {

+      std::unique_lock<std::mutex> lk(m);

+      cv.wait(lk, []{ return ready; });

+    }

+

+    result = resultQueue.front();

+    resultQueue.pop();

+    if (result != TET_PASS)

+      break;

+

+    result = resultQueue.front();

+    resultQueue.pop();

+    if (result != TET_PASS)

+      break;

+

+    ready = false;

+

+    testQueue.push(SA_MSG_QUEUE_CAPACITY_AVAILABLE);

+    testQueue.push(SA_MSG_QUEUE_GROUP_CAPACITY_AVAILABLE);

+

+    /* read a message */

+    char data[4];

+    SaMsgMessageT msg;

+    SaMsgSenderIdT senderId;

+    memset(&msg, 0, sizeof(msg));

+    msg.data = &data;

+    msg.size = sizeof(data);

+

+    rc = saMsgMessageGet(queueHandle, &msg, 0, &senderId, SA_TIME_ONE_SECOND);

+    assert(rc == SA_AIS_OK);

+

+    {

+      std::unique_lock<std::mutex> lk(m);

+      cv.wait(lk, []{ return ready; });

+    }

+

+    result = resultQueue.front();

+    resultQueue.pop();

+    if (result != TET_PASS)

+      break;

+

+    result = resultQueue.front();

+    resultQueue.pop();

+    if (result != TET_PASS)

+      break;

+

+    result = testNtfCleanup(ntfHandle);

+    if (result != TET_PASS) {

+      break;

+    }

+

+    rc = saMsgQueueGroupDelete(msgHandle, &queueGroupName);

+    assert(rc == SA_AIS_OK);

+

+    rc = saMsgFinalize(msgHandle);

+    assert(rc == SA_AIS_OK);

+

+    t.join();

+  } while (false);

+

+  test_validate(result, TET_PASS);

+

+  // let the queue close

+  sleep(2);

+}

+

+__attribute__((constructor)) static void capacityThresholds_constructor(void) {

+  test_suite_add(27, "Capacity Thresholds Test Suite");

+  test_case_add(27,

+                capacityThresholds_01,

+                "saMsgQueueCapacityThresholdsSet returns BAD_HANDLE when "

+                "queueHandle is bad");

+  test_case_add(27,

+                capacityThresholds_02,

+                "saMsgQueueCapacityThresholdsSet with finalized handle");

+  test_case_add(27,

+                capacityThresholds_03,

+                "saMsgQueueCapacityThresholdsSet with closed queue");

+  test_case_add(27,

+                capacityThresholds_04,

+                "saMsgQueueCapacityThresholdsSet with null thresholds 
pointer");

+  test_case_add(27,

+                capacityThresholds_05,

+                "saMsgQueueCapacityThresholdsSet with capacityAvailable > "

+                "capacityReached (1)");

+  test_case_add(27,

+                capacityThresholds_06,

+                "saMsgQueueCapacityThresholdsSet with capacityAvailable > "

+                "capacityReached (2)");

+  test_case_add(27,

+                capacityThresholds_07,

+                "saMsgQueueCapacityThresholdsSet with capacityAvailable > "

+                "capacityReached (3)");

+  test_case_add(27,

+                capacityThresholds_08,

+                "saMsgQueueCapacityThresholdsSet with capacityAvailable > "

+                "capacityReached (4)");

+  test_case_add(27,

+                capacityThresholds_09,

+                "saMsgQueueCapacityThresholdsSet with version != B.03.01");

+  test_case_add(27,

+                capacityThresholds_10,

+                "saMsgQueueCapacityThresholdsSet with size < capacityReached "

+                "(1)");

+  test_case_add(27,

+                capacityThresholds_11,

+                "saMsgQueueCapacityThresholdsSet with size < capacityReached "

+                "(2)");

+  test_case_add(27,

+                capacityThresholds_12,

+                "saMsgQueueCapacityThresholdsSet with size < capacityReached "

+                "(3)");

+  test_case_add(27,

+                capacityThresholds_13,

+                "saMsgQueueCapacityThresholdsSet with size < capacityReached "

+                "(4)");

+  test_case_add(27,

+                capacityThresholds_14,

+                "saMsgQueueCapacityThresholdsSet success");

+  test_case_add(27,

+                capacityThresholds_15,

+                "saMsgQueueCapacityThresholdsGet returns BAD_HANDLE when "

+                "queueHandle is bad");

+  test_case_add(27,

+                capacityThresholds_16,

+                "saMsgQueueCapacityThresholdsGet with finalized handle");

+  test_case_add(27,

+                capacityThresholds_17,

+                "saMsgQueueCapacityThresholdsGet with closed queue");

+  test_case_add(27,

+                capacityThresholds_18,

+                "saMsgQueueCapacityThresholdsGet with null thresholds 
pointer");

+  test_case_add(27,

+                capacityThresholds_19,

+                "saMsgQueueCapacityThresholdsGet with version != B.03.01");

+  test_case_add(27,

+                capacityThresholds_20,

+                "saMsgQueueCapacityThresholdsGet success");

+  test_case_add(27,

+                capacityThresholds_21,

+                "Capacity Reached Notification sent");

+  test_case_add(27,

+                capacityThresholds_22,

+                "Capacity Available Notification sent");

+  test_case_add(27,

+                capacityThresholds_23,

+                "Capacity Reached Notification sent (group)");

+  test_case_add(27,

+                capacityThresholds_24,

+                "Capacity Available Notification sent (group)");

+}

diff --git a/src/msg/apitest/test_ErrUnavailable.cc 
b/src/msg/apitest/test_ErrUnavailable.cc

index 3f37f8d..dc1b371 100644

--- a/src/msg/apitest/test_ErrUnavailable.cc

+++ b/src/msg/apitest/test_ErrUnavailable.cc

@@ -504,6 +504,59 @@ static void saErrUnavailable_25(void)

   assert(rc == SA_AIS_OK);

 }



+static void saErrUnavailable_26(void)

+{

+  SaMsgHandleT msgHandle;

+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);

+  assert(rc == SA_AIS_OK);

+

+  SaMsgQueueHandleT queueHandle;

+  rc = saMsgQueueOpen(msgHandle,

+                      &queueName,

+                      &creationAttributes,

+                      SA_MSG_QUEUE_CREATE,

+                      SA_TIME_END,

+                      &queueHandle);

+  assert(rc == SA_AIS_OK);

+

+  lockUnlockNode(true);

+  const SaMsgQueueThresholdsT thresholds = {

+       { 5, 5, 5, 5 },

+       { 5, 5, 5, 5 }

+  };

+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);

+  test_validate(rc, SA_AIS_ERR_UNAVAILABLE);

+  lockUnlockNode(false);

+

+  rc = saMsgFinalize(msgHandle);

+  assert(rc == SA_AIS_OK);

+}

+

+static void saErrUnavailable_27(void)

+{

+  SaMsgHandleT msgHandle;

+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);

+  assert(rc == SA_AIS_OK);

+

+  SaMsgQueueHandleT queueHandle;

+  rc = saMsgQueueOpen(msgHandle,

+                      &queueName,

+                      &creationAttributes,

+                      SA_MSG_QUEUE_CREATE,

+                      SA_TIME_END,

+                      &queueHandle);

+  assert(rc == SA_AIS_OK);

+

+  lockUnlockNode(true);

+  SaMsgQueueThresholdsT thresholds;

+  rc = saMsgQueueCapacityThresholdsGet(queueHandle, &thresholds);

+  test_validate(rc, SA_AIS_ERR_UNAVAILABLE);

+  lockUnlockNode(false);

+

+  rc = saMsgFinalize(msgHandle);

+  assert(rc == SA_AIS_OK);

+}

+

 static void saErrUnavailable_30(void)

 {

   lockUnlockNode(true);

@@ -940,6 +993,59 @@ static void saErrUnavailable_54(void)

   assert(rc == SA_AIS_OK);

 }



+static void saErrUnavailable_55(void)

+{

+  SaMsgHandleT msgHandle;

+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);

+  assert(rc == SA_AIS_OK);

+

+  SaMsgQueueHandleT queueHandle;

+  rc = saMsgQueueOpen(msgHandle,

+                      &queueName,

+                      &creationAttributes,

+                      SA_MSG_QUEUE_CREATE,

+                      SA_TIME_END,

+                      &queueHandle);

+  assert(rc == SA_AIS_OK);

+

+  lockUnlockNode(true);

+  lockUnlockNode(false);

+  const SaMsgQueueThresholdsT thresholds = {

+       { 5, 5, 5, 5 },

+       { 5, 5, 5, 5 }

+  };

+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);

+  test_validate(rc, SA_AIS_ERR_UNAVAILABLE);

+

+  rc = saMsgFinalize(msgHandle);

+  assert(rc == SA_AIS_OK);

+}

+

+static void saErrUnavailable_56(void)

+{

+  SaMsgHandleT msgHandle;

+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);

+  assert(rc == SA_AIS_OK);

+

+  SaMsgQueueHandleT queueHandle;

+  rc = saMsgQueueOpen(msgHandle,

+                      &queueName,

+                      &creationAttributes,

+                      SA_MSG_QUEUE_CREATE,

+                      SA_TIME_END,

+                      &queueHandle);

+  assert(rc == SA_AIS_OK);

+

+  lockUnlockNode(true);

+  lockUnlockNode(false);

+  SaMsgQueueThresholdsT thresholds;

+  rc = saMsgQueueCapacityThresholdsGet(queueHandle, &thresholds);

+  test_validate(rc, SA_AIS_ERR_UNAVAILABLE);

+

+  rc = saMsgFinalize(msgHandle);

+  assert(rc == SA_AIS_OK);

+}

+



 __attribute__((constructor)) static void saErrUnavailable_constructor(void)

 {

@@ -969,9 +1075,9 @@ __attribute__((constructor)) static void 
saErrUnavailable_constructor(void)

   test_case_add(26, saErrUnavailable_23, "saMsgMessageSendReceive");

   test_case_add(26, saErrUnavailable_24, "saMsgMessageReply");

   test_case_add(26, saErrUnavailable_25, "saMsgMessageReplyAsync");

-#if 0

   test_case_add(26, saErrUnavailable_26, "saMsgQueueCapacityThresholdsSet");

   test_case_add(26, saErrUnavailable_27, "saMsgQueueCapacityThresholdsGet");

+#if 0

   test_case_add(26, saErrUnavailable_28, "saMsgMetadataSizeGet");

   test_case_add(26, saErrUnavailable_29, "saMsgLimitGet");

 #endif

@@ -1000,9 +1106,9 @@ __attribute__((constructor)) static void 
saErrUnavailable_constructor(void)

   test_case_add(26, saErrUnavailable_52, "saMsgMessageSendReceive (stale)");

   test_case_add(26, saErrUnavailable_53, "saMsgMessageReply (stale)");

   test_case_add(26, saErrUnavailable_54, "saMsgMessageReplyAsync (stale)");

-#if 0

   test_case_add(26, saErrUnavailable_55, "saMsgQueueCapacityThresholdsSet 
(stale)");

   test_case_add(26, saErrUnavailable_56, "saMsgQueueCapacityThresholdsGet 
(stale)");

+#if 0

   test_case_add(26, saErrUnavailable_57, "saMsgMetadataSizeGet (stale)");

   test_case_add(26, saErrUnavailable_58, "saMsgLimitGet (stale)");

 #endif

diff --git a/src/msg/common/mqsv_edu.c b/src/msg/common/mqsv_edu.c

index 7b0749b..a5301da 100644

--- a/src/msg/common/mqsv_edu.c

+++ b/src/msg/common/mqsv_edu.c

@@ -477,7 +477,8 @@ static int mqsv_mqp_req_test_type_fnc(NCSCONTEXT arg)

               LCL_TEST_JUMP_OFFSET_MQP_EVT_TRANSFER_QUEUE_COMPLETE = 60,

               LCL_TEST_JUMP_OFFSET_MQP_EVT_UPDATE_STATS = 62,

               LCL_TEST_JUMP_OFFSET_MQP_EVT_RET_TIME_SET_REQ = 65,

-              LCL_TEST_JUMP_OFFSET_MQP_EVT_CLM_NOTIFY = 67 };

+              LCL_TEST_JUMP_OFFSET_MQP_EVT_CLM_NOTIFY = 67,

+              LCL_TEST_JUMP_OFFSET_MQP_EVT_CAPACITY = 68 };

        MQP_REQ_TYPE type;



        if (arg == NULL)

@@ -516,6 +517,9 @@ static int mqsv_mqp_req_test_type_fnc(NCSCONTEXT arg)

                return LCL_TEST_JUMP_OFFSET_MQP_EVT_RET_TIME_SET_REQ;

        case MQP_EVT_CLM_NOTIFY:

                return LCL_TEST_JUMP_OFFSET_MQP_EVT_CLM_NOTIFY;

+       case MQP_EVT_CAP_SET_REQ:

+       case MQP_EVT_CAP_GET_REQ:

+               return LCL_TEST_JUMP_OFFSET_MQP_EVT_CAPACITY;



        default:

                break;

@@ -578,6 +582,55 @@ static uint32_t mqsv_edp_samsgqueuecreationattributest(



 /*****************************************************************************



+  PROCEDURE NAME:   mqsv_edp_samsgqueuethresholdst

+

+  DESCRIPTION:      EDU program handler for "SaMsgQueueThresholdsT" data. This

+function is invoked by EDU for performing encode/decode operation on

+"SaMsgQueueThresholdsT" data.

+

+  RETURNS:          NCSCC_RC_SUCCESS/NCSCC_RC_FAILURE

+

+*****************************************************************************/

+static uint32_t mqsv_edp_samsgqueuethresholdst(

+    EDU_HDL *hdl, EDU_TKN *edu_tkn, NCSCONTEXT ptr, uint32_t *ptr_data_len,

+    EDU_BUF_ENV *buf_env, EDP_OP_TYPE op, EDU_ERR *o_err)

+{

+       uint32_t rc = NCSCC_RC_SUCCESS;

+       SaMsgQueueThresholdsT *struct_ptr = NULL, **d_ptr = NULL;

+       EDU_INST_SET mqsv_samsgqueuethresholds_rules[] = {

+           {EDU_START, mqsv_edp_samsgqueuethresholdst, 0, 0, 0,

+            sizeof(SaMsgQueueThresholdsT), 0, NULL},

+           {EDU_EXEC, m_NCS_EDP_SASIZET, EDQ_ARRAY, 0, 0,

+            (long)&((SaMsgQueueThresholdsT *)0)->capacityReached,

+            SA_MSG_MESSAGE_LOWEST_PRIORITY + 1, NULL},

+           {EDU_EXEC, m_NCS_EDP_SASIZET, EDQ_ARRAY, 0, 0,

+            (long)&((SaMsgQueueThresholdsT *)0)->capacityAvailable,

+            SA_MSG_MESSAGE_LOWEST_PRIORITY + 1, NULL},

+           {EDU_END, 0, 0, 0, 0, 0, 0, NULL},

+       };

+

+       if (op == EDP_OP_TYPE_ENC) {

+               struct_ptr = (SaMsgQueueThresholdsT *)ptr;

+       } else if (op == EDP_OP_TYPE_DEC) {

+               d_ptr = (SaMsgQueueThresholdsT **)ptr;

+               if (*d_ptr == NULL) {

+                       /* This should have already been a valid pointer. */

+                       *o_err = EDU_ERR_MEM_FAIL;

+                       return NCSCC_RC_FAILURE;

+               }

+               memset(*d_ptr, '\0', sizeof(SaMsgQueueThresholdsT));

+               struct_ptr = *d_ptr;

+       } else {

+               struct_ptr = ptr;

+       }

+       rc = m_NCS_EDU_RUN_RULES(hdl, edu_tkn,

+                                mqsv_samsgqueuethresholds_rules,

+                                struct_ptr, ptr_data_len, buf_env, op, o_err);

+       return rc;

+}

+

+/*****************************************************************************

+

   PROCEDURE NAME:   mqsv_edp_mqp_req



   DESCRIPTION:      EDU program handler for "MQP_REQ_MSG" data. This

@@ -802,6 +855,13 @@ static uint32_t mqsv_edp_mqp_req(EDU_HDL *hdl, EDU_TKN 
*edu_tkn, NCSCONTEXT ptr,

            {EDU_EXEC, ncs_edp_uns32, 0, 0, 0,

             (long)&((MQP_REQ_MSG *)0)->info.clmNotify.node_joined, 0, NULL},



+           /* MQP_EVT_CAP_REQ */

+           {EDU_EXEC, m_NCS_EDP_SAMSGQUEUEHANDLET, 0, 0, 0,

+            (long)&((MQP_REQ_MSG *)0)->info.capacity.queueHandle, 0, NULL},

+           {EDU_EXEC, mqsv_edp_samsgqueuethresholdst, 0, 0, 0,

+            (long)&((MQP_REQ_MSG *)0)->info.capacity.thresholds,

+            0, NULL},

+

            {EDU_END, 0, 0, 0, 0, 0, 0, NULL},



        };

@@ -956,7 +1016,7 @@ static int mqsv_mqp_rsp_test_type_fnc(NCSCONTEXT arg)

               LCL_TEST_JUMP_OFFSET_MQP_EVT_TRANSFER_QUEUE_RSP,

               LCL_TEST_JUMP_OFFSET_MQP_EVT_ND_RESTART_RSP = 29,

               LCL_TEST_JUMP_OFFSET_MQP_EVT_RET_TIME_SET_RSP,

-

+              LCL_TEST_JUMP_OFFSET_MQP_EVT_CAP_RSP = 32

        };

        MQP_RSP_TYPE type;



@@ -988,6 +1048,9 @@ static int mqsv_mqp_rsp_test_type_fnc(NCSCONTEXT arg)

                return LCL_TEST_JUMP_OFFSET_MQP_EVT_ND_RESTART_RSP;

        case MQP_EVT_Q_RET_TIME_SET_RSP:

                return LCL_TEST_JUMP_OFFSET_MQP_EVT_RET_TIME_SET_RSP;

+       case MQP_EVT_CAP_SET_RSP:

+       case MQP_EVT_CAP_GET_RSP:

+               return LCL_TEST_JUMP_OFFSET_MQP_EVT_CAP_RSP;

        default:

                break;

        }

@@ -1109,6 +1172,13 @@ static uint32_t mqsv_edp_mqp_rsp(EDU_HDL *hdl, EDU_TKN 
*edu_tkn, NCSCONTEXT ptr,

             (long)&((MQP_RSP_MSG *)0)->info.retTimeSetRsp.queueHandle, 0,

             NULL},



+           /* MQP_EVT_CAP_RSP */

+           {EDU_EXEC, m_NCS_EDP_SAMSGQUEUEHANDLET, 0, 0, 0,

+            (long)&((MQP_RSP_MSG *)0)->info.capacity.queueHandle, 0, NULL},

+           {EDU_EXEC, mqsv_edp_samsgqueuethresholdst, 0, 0, 0,

+            (long)&((MQP_RSP_MSG *)0)->info.capacity.thresholds,

+            0, NULL},

+

            {EDU_END, 0, 0, 0, 0, 0, 0, NULL},

        };



diff --git a/src/msg/common/mqsv_evt.h b/src/msg/common/mqsv_evt.h

index ef7c739..4e26bf6 100644

--- a/src/msg/common/mqsv_evt.h

+++ b/src/msg/common/mqsv_evt.h

@@ -62,7 +62,9 @@ typedef enum mqp_req_type {

   MQP_EVT_SEND_MSG_ASYNC,

   MQP_EVT_CB_DUMP, /* For CPND CB Dump */

   MQP_EVT_Q_RET_TIME_SET_REQ,

-  MQP_EVT_CLM_NOTIFY

+  MQP_EVT_CLM_NOTIFY,

+  MQP_EVT_CAP_SET_REQ,

+  MQP_EVT_CAP_GET_REQ

 } MQP_REQ_TYPE;



 /* Enums for MQP Message Types */

@@ -81,7 +83,9 @@ typedef enum mqp_rsp_type {

   MQP_EVT_TRANSFER_QUEUE_RSP,

   MQP_EVT_MQND_RESTART_RSP,

   MQP_EVT_STAT_UPD_RSP,

-  MQP_EVT_Q_RET_TIME_SET_RSP

+  MQP_EVT_Q_RET_TIME_SET_RSP,

+  MQP_EVT_CAP_SET_RSP,

+  MQP_EVT_CAP_GET_RSP

 } MQP_RSP_TYPE;



 /* Enums for MQD messages */

@@ -188,6 +192,11 @@ typedef struct mqp_q_ret_time_set_rsp {

   SaMsgQueueHandleT queueHandle;

 } MQP_Q_RET_TIME_SET_RSP;



+typedef struct mqp_capacity {

+  SaMsgQueueHandleT queueHandle;

+  SaMsgQueueThresholdsT thresholds;

+} MQP_CAPACITY;

+

 /* saMsgQueueOpen(): */



 typedef struct mqp_open_req {

@@ -396,6 +405,7 @@ typedef struct mqp_req_msg {

     MQP_UPDATE_STATS statsReq;

     MQP_Q_RET_TIME_SET_REQ retTimeSetReq;

     MQP_CLM_NOTIFY clmNotify;

+    MQP_CAPACITY capacity;

   } info;

 } MQP_REQ_MSG;



@@ -415,6 +425,7 @@ typedef struct mqp_rsp_msg {

     MQP_QUEUE_REPLY_RSP replyRsp;

     MQP_SEND_MSG_RSP sendMsgRsp;

     MQP_Q_RET_TIME_SET_RSP retTimeSetRsp;

+    MQP_CAPACITY capacity;

   } info;

 } MQP_RSP_MSG;



diff --git a/src/msg/msgd/mqd_api.c b/src/msg/msgd/mqd_api.c

index 8416851..83d5c21 100644

--- a/src/msg/msgd/mqd_api.c

+++ b/src/msg/msgd/mqd_api.c

@@ -49,10 +49,11 @@



 #include "msg/msgd/mqd.h"

 #include "mqd_imm.h"

+#include "mqd_ntf.h"



 MQDLIB_INFO gl_mqdinfo;



-enum { FD_TERM = 0, FD_AMF, FD_MBCSV, FD_MBX, FD_CLM, NUM_FD };

+enum { FD_TERM = 0, FD_AMF, FD_MBCSV, FD_MBX, FD_CLM, FD_NTF, NUM_FD };



 static struct pollfd fds[NUM_FD];

 static nfds_t nfds = NUM_FD;

@@ -146,12 +147,50 @@ static SaAisErrorT mqd_clm_init(MQD_CB *cb)

                TRACE_1("saClmClusterTrack success");

        } while (false);



-       if (saErr != SA_AIS_OK && !cb->clm_hdl)

+       if (saErr != SA_AIS_OK && cb->clm_hdl)

                saClmFinalize(cb->clm_hdl);



        return saErr;

 }



+static SaAisErrorT mqd_ntf_init(MQD_CB *cb)

+{

+       SaAisErrorT rc = SA_AIS_OK;

+       TRACE_ENTER();

+

+       do {

+               SaVersionT ntfVersion = { 'A', 1, 1 };

+               SaNtfCallbacksT callbacks = {

+                       mqdNtfNotificationCallback,

+                       0

+               };

+

+               rc = saNtfInitialize(&cb->ntfHandle, &callbacks, &ntfVersion);

+               if (rc != SA_AIS_OK) {

+                       LOG_ER("saNtfInitialize failed with error %u", rc);

+                       break;

+               }

+

+               rc = saNtfSelectionObjectGet(cb->ntfHandle, &cb->ntf_sel_obj);

+               if (SA_AIS_OK != rc) {

+                       LOG_ER("saNtfSelectionObjectGet failed with error %u",

+                              rc);

+                       break;

+               }

+

+               rc = mqdInitNtfSubscriptions(cb->ntfHandle);

+               if (rc != SA_AIS_OK)

+                       break;

+       } while (false);

+

+       if (rc != SA_AIS_OK && cb->ntfHandle)

+               saNtfFinalize(cb->ntfHandle);

+

+       TRACE_LEAVE();

+

+       return rc;

+}

+

 /****************************************************************************\

   PROCEDURE NAME : mqd_lib_init



@@ -306,6 +345,16 @@ static uint32_t mqd_lib_init(void)

                mqd_cb_shut(pMqd);

                return NCSCC_RC_FAILURE;

        }

+

+       if (mqd_ntf_init(pMqd) != SA_AIS_OK) {

+               mqd_asapi_unbind();

+               saAmfFinalize(pMqd->amf_hdl);

+               saClmFinalize(pMqd->clm_hdl);

+               ncshm_give_hdl(pMqd->hdl);

+               mqd_cb_shut(pMqd);

+               return NCSCC_RC_FAILURE;

+       }

+

        if ((rc = initialize_for_assignment(pMqd, pMqd->ha_state)) !=

            NCSCC_RC_SUCCESS) {

                LOG_ER("initialize_for_assignment FAILED %u", (unsigned)rc);

@@ -493,6 +542,8 @@ void mqd_main_process(uint32_t hdl)

                fds[FD_CLM].events = POLLIN;

                fds[FD_MBCSV].fd = pMqd->mbcsv_sel_obj;

                fds[FD_MBCSV].events = POLLIN;

+               fds[FD_NTF].fd = pMqd->ntf_sel_obj;

+               fds[FD_NTF].events = POLLIN;



                int ret = poll(fds, nfds, -1);

                if (ret == -1) {

@@ -522,6 +573,15 @@ void mqd_main_process(uint32_t hdl)

                                LOG_ER("saClmDispatch failed: %u", err);

                        }

                }

+

+               if (fds[FD_NTF].revents & POLLIN) {

+                       /* dispatch all the NTF pending function */

+                       err = saNtfDispatch(pMqd->ntfHandle, SA_DISPATCH_ALL);

+                       if (err != SA_AIS_OK) {

+                               LOG_ER("saNtfDispatch failed: %u", err);

+                       }

+               }

+

                /* dispatch all the MBCSV pending callbacks */

                if (fds[FD_MBCSV].revents & POLLIN) {

                        mbcsv_arg.i_op = NCS_MBCSV_OP_DISPATCH;

diff --git a/src/msg/msgd/mqd_db.h b/src/msg/msgd/mqd_db.h

index 022b03a..93dc18f 100644

--- a/src/msg/msgd/mqd_db.h

+++ b/src/msg/msgd/mqd_db.h

@@ -34,6 +34,7 @@

 #include <stdbool.h>

 #include <saClm.h>

 #include <saImmOi.h>

+#include <saNtf.h>



 #define MQSV_MQD_MBCSV_VERSION 1

 #define MQSV_MQD_MBCSV_VERSION_MIN 1

@@ -91,6 +92,7 @@ typedef struct mqd_qinfo {

   NCS_QUEUE ilist; /* Queue/Group element list */

   NCS_QUEUE tlist; /* List of the user who opted track */

   SaTimeT creationTime;

+  bool capacityReached;

 } MQD_OBJ_INFO;



 typedef struct mqd_object_elem {

@@ -146,6 +148,7 @@ typedef struct mqd_cb {

   SaClmHandleT clm_hdl;

   SaAmfHAStateT ha_state; /* Present AMF HA state of the component */

   SaNameT comp_name;

+  SaNtfHandleT ntfHandle;



   NCS_MBCSV_HDL mbcsv_hdl;       /*MBCSV handle for Redundancy of initialize  
*/

   NCS_MBCSV_CKPT_HDL o_ckpt_hdl; /*Opened Checkpoint Handle */

@@ -171,6 +174,7 @@ typedef struct mqd_cb {

   SaSelectionObjectT imm_sel_obj; /*Selection object to wait for

                                      IMM events */

   SaSelectionObjectT clm_sel_obj;

+  SaSelectionObjectT ntf_sel_obj;

   bool fully_initialized;

 } MQD_CB;



diff --git a/src/msg/msgd/mqd_ntf.cc b/src/msg/msgd/mqd_ntf.cc

new file mode 100644

index 0000000..ab29193

--- /dev/null

+++ b/src/msg/msgd/mqd_ntf.cc

@@ -0,0 +1,310 @@

+/*      -*- OpenSAF  -*-

+ *

+ * (C) Copyright 2017 The OpenSAF Foundation

+ *

+ * This program is distributed in the hope that it will be useful, but

+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY

+ * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed

+ * under the GNU Lesser General Public License Version 2.1, February 1999.

+ * The complete license can be accessed from the following location:

+ * 
https://urldefense.proofpoint.com/v2/url?u=http-3A__opensource.org_licenses_lgpl-2Dlicense.php&d=DwIF-g&c=RoP1YumCXCgaWHvlZYR8PZh8Bv7qIrMUB65eapI_JnE&r=rU6x356sikQZSi7Ttc2DuiqAgbc0QIeANg72N5AllVc&m=zb33_QGOVBVMiOqaTZ4TDnD1K3tft2hmzm4Zl_-Ak9E&s=zW1Da85bMtBUT-FnbCAff4Qx8AxM9-IibHRtx7PHWNs&e=

+ * See the Copying file included with the OpenSAF distribution for full

+ * licensing terms.

+ *

+ * Author(s): Genband

+ *

+ */

+#include <cstring>

+#include <vector>

+#include <saMsg.h>

+#include "base/ncs_edu_pub.h"

+#include "base/logtrace.h"

+#include "base/ncs_queue.h"

+#include "base/ncspatricia.h"

+#include "base/ncssysf_tmr.h"

+#include "mbc/mbcsv_papi.h"

+#include "mds/mds_papi.h"

+#include "msg/common/mqsv_def.h"

+#include "msg/common/mqsv_asapi.h"

+#include "mqd_tmr.h"

+#include "mqd_db.h"

+#include "mqd_dl_api.h"

+#include "mqd_ntf.h"

+

+extern MQDLIB_INFO gl_mqdinfo;

+

+static const int operStateSubId = 1;

+

+static void sendStateChangeNotification(MQD_CB *cb,

+                                        const SaNameT& queueGroup,

+                                        SaUint16T status) {

+  TRACE_ENTER();

+

+  do {

+    if (status == SA_MSG_QUEUE_CAPACITY_REACHED)

+      status = SA_MSG_QUEUE_GROUP_CAPACITY_REACHED;

+    else if (status == SA_MSG_QUEUE_CAPACITY_AVAILABLE)

+      status = SA_MSG_QUEUE_GROUP_CAPACITY_AVAILABLE;

+

+    const SaNameT notifyingObject = {

+      sizeof("safApp=safMsgService") - 1,

+      "safApp=safMsgService"

+    };

+

+    SaNtfStateChangeNotificationT ntfStateChange = { 0 };

+

+    SaAisErrorT rc(saNtfStateChangeNotificationAllocate(cb->ntfHandle,

+                                                        &ntfStateChange,

+                                                        1,

+                                                        0,

+                                                        0,

+                                                        1,

+                                                        0));

+

+    if (rc != SA_AIS_OK) {

+      LOG_ER("saNtfStateChangeNotificationAllocate " "failed: %i", rc);

+      break;

+    }

+

+    *ntfStateChange.notificationHeader.eventType = SA_NTF_OBJECT_STATE_CHANGE;

+

+    memcpy(ntfStateChange.notificationHeader.notificationObject,

+           &queueGroup,

+           sizeof(SaNameT));

+

+    memcpy(ntfStateChange.notificationHeader.notifyingObject,

+           &notifyingObject,

+           sizeof(SaNameT));

+

+    ntfStateChange.notificationHeader.notificationClassId->vendorId =

+      SA_NTF_VENDOR_ID_SAF;

+    ntfStateChange.notificationHeader.notificationClassId->majorId =

+      SA_SVC_MSG;

+    ntfStateChange.notificationHeader.notificationClassId->minorId =

+      (status == SA_MSG_QUEUE_GROUP_CAPACITY_REACHED) ? 0x67 : 0x68;

+

+    *ntfStateChange.sourceIndicator = SA_NTF_OBJECT_OPERATION;

+

+    ntfStateChange.changedStates[0].stateId = SA_MSG_DEST_CAPACITY_STATUS;

+

+    ntfStateChange.changedStates[0].oldStatePresent =

+      (status == SA_MSG_QUEUE_GROUP_CAPACITY_REACHED) ? SA_FALSE : SA_TRUE;

+

+    if (status == SA_MSG_QUEUE_GROUP_CAPACITY_AVAILABLE) {

+      ntfStateChange.changedStates[0].oldState =

+        SA_MSG_QUEUE_GROUP_CAPACITY_REACHED;

+    }

+

+    ntfStateChange.changedStates[0].newState = status;

+

+    rc = saNtfNotificationSend(ntfStateChange.notificationHandle);

+

+    if (rc != SA_AIS_OK)

+      LOG_ER("saNtfNotificationSend failed: %i", rc);

+

+    rc = saNtfNotificationFree(ntfStateChange.notificationHandle);

+

+    if (rc != SA_AIS_OK)

+      LOG_ER("saNtfNotificationFree failed: %i", rc);

+  } while (false);

+

+  TRACE_LEAVE();

+}

+

+static void updateQueueGroup(MQD_CB *cb,

+                             const SaNameT& queueName,

+                             SaUint16T status) {

+  TRACE_ENTER();

+

+  bool member(false), allFull(true);

+  typedef std::vector<SaNameT> GroupList;

+  GroupList groupList;

+

+  // maybe there is a better way to do this?

+  for (MQD_OBJ_NODE *objNode(reinterpret_cast<MQD_OBJ_NODE *>

+         (ncs_patricia_tree_getnext(&cb->qdb, 0)));

+       objNode;

+       objNode = reinterpret_cast<MQD_OBJ_NODE *>(ncs_patricia_tree_getnext(

+         &cb->qdb,

+         reinterpret_cast<uint8_t *>(&objNode->oinfo.name)))) {

+

+    if (objNode->oinfo.type == MQSV_OBJ_QGROUP) {

+      NCS_Q_ITR itr = { 0 };

+

+      for (MQD_OBJECT_ELEM *objElem(static_cast<MQD_OBJECT_ELEM *>

+             (ncs_queue_get_next(&objNode->oinfo.ilist, &itr)));

+           objElem;

+           objElem = static_cast<MQD_OBJECT_ELEM *>

+             (ncs_queue_get_next(&objNode->oinfo.ilist, &itr))) {

+        if (objElem->pObject->type == MQSV_OBJ_QUEUE) {

+          if (queueName.length == objElem->pObject->name.length &&

+              !memcmp(queueName.value,

+                      objElem->pObject->name.value,

+                      queueName.length)) {

+            member = true;

+            if (status == SA_MSG_QUEUE_CAPACITY_REACHED)

+              objElem->pObject->capacityReached = true;

+            else if (status == SA_MSG_QUEUE_CAPACITY_AVAILABLE)

+              objElem->pObject->capacityReached = false;

+

+            groupList.push_back(objNode->oinfo.name);

+          } else if (!objElem->pObject->capacityReached) {

+            allFull = false;

+          }

+        }

+      }

+    }

+  }

+

+  if (member && ((status == SA_MSG_QUEUE_CAPACITY_REACHED && allFull) ||

+      (status == SA_MSG_QUEUE_CAPACITY_AVAILABLE && allFull))) {

+    for (auto& it : groupList)

+      sendStateChangeNotification(cb, it, status);

+  } else {

+    // don't care about anything else

+  }

+

+  TRACE_LEAVE();

+}

+

+static bool isMSGStateChange(const SaNtfClassIdT &classId) {

+  TRACE_ENTER();

+  bool status(false);

+

+  if (classId.vendorId == SA_NTF_VENDOR_ID_SAF &&

+      classId.majorId == SA_SVC_MSG) {

+    status = true;

+  }

+

+  TRACE_LEAVE2("%i", status);

+  return status;

+}

+

+static void handleMsgObjectStateChangeNotification(

+  MQD_CB *cb,

+  const SaNtfStateChangeNotificationT& stateChangeNotification) {

+  TRACE_ENTER();

+

+  do {

+    if (*stateChangeNotification.sourceIndicator != SA_NTF_OBJECT_OPERATION) {

+      LOG_ER("expected object operation -- got %i",

+             *stateChangeNotification.sourceIndicator);

+      break;

+    }

+

+    if (stateChangeNotification.numStateChanges != 1) {

+      LOG_ER("expected one state change -- got %i",

+             stateChangeNotification.numStateChanges);

+      break;

+    }

+

+    if (stateChangeNotification.changedStates[0].stateId !=

+          SA_MSG_DEST_CAPACITY_STATUS) {

+      LOG_ER("unknown state id: %i",

+             stateChangeNotification.changedStates[0].stateId);

+      break;

+    }

+

+    // if all the queues for a group are full then send a notification

+    updateQueueGroup(

+      cb,

+      *stateChangeNotification.notificationHeader.notificationObject,

+      stateChangeNotification.changedStates[0].newState);

+  } while (false);

+

+  TRACE_LEAVE();

+}

+

+static void handleStateChangeNotification(

+  MQD_CB *cb,

+  const SaNtfStateChangeNotificationT& stateChangeNotification) {

+  TRACE_ENTER();

+

+  if (stateChangeNotification.notificationHeader.notificationClassId &&

+      isMSGStateChange(

+          *stateChangeNotification.notificationHeader.notificationClassId)) {

+    handleMsgObjectStateChangeNotification(cb, stateChangeNotification);

+  } else {

+    TRACE("ignoring non-MSG state change notification");

+  }

+

+  TRACE_LEAVE();

+}

+

+void mqdNtfNotificationCallback(SaNtfSubscriptionIdT subscriptionId,

+                                const SaNtfNotificationsT *notification) {

+  TRACE_ENTER();

+  MQD_CB *cb(static_cast<MQD_CB *>(ncshm_take_hdl(NCS_SERVICE_ID_MQD,

+                                                  gl_mqdinfo.inst_hdl)));

+

+  if (!cb) {

+    LOG_ER("can't get mqd control block");

+  }

+

+  do {

+    if (cb->ha_state != SA_AMF_HA_ACTIVE)

+      break;

+

+    if (subscriptionId != operStateSubId) {

+      TRACE("unknown subscription id received in mqdNtfNotificationCallback: "

+            "%d",

+            subscriptionId);

+      break;

+    }

+

+    if (notification->notificationType == SA_NTF_TYPE_STATE_CHANGE) {

+      handleStateChangeNotification(

+          cb,

+          notification->notification.stateChangeNotification);

+    } else {

+      TRACE("ignoring NTF notification of type: %i",

+            notification->notificationType);

+    }

+  } while (false);

+

+  ncshm_give_hdl(cb->hdl);

+

+  TRACE_LEAVE();

+}

+

+SaAisErrorT mqdInitNtfSubscriptions(SaNtfHandleT ntfHandle) {

+  TRACE_ENTER();

+  SaAisErrorT rc(SA_AIS_OK);

+

+  do {

+    SaNtfStateChangeNotificationFilterT filter;

+

+    rc = saNtfStateChangeNotificationFilterAllocate(

+      ntfHandle,

+      &filter, 0, 0, 0, 0, 0, 0);

+

+    if (rc != SA_AIS_OK) {

+      LOG_ER("saNtfAttributeChangeNotificationFilterAllocate"

+        " failed: %i",

+        rc);

+      break;

+    }

+

+    SaNtfNotificationTypeFilterHandlesT filterHandles = {

+      0, 0, filter.notificationFilterHandle, 0, 0

+    };

+

+    rc = saNtfNotificationSubscribe(&filterHandles,

+      operStateSubId);

+

+    if (rc != SA_AIS_OK) {

+      LOG_ER("saNtfNotificationSubscribe failed: %i", rc);

+      break;

+    }

+

+    rc = saNtfNotificationFilterFree(filter.notificationFilterHandle);

+

+    if (rc != SA_AIS_OK) {

+      LOG_ER("saNtfNotificationFilterFree failed: %i", rc);

+      break;

+    }

+  } while (false);

+

+  TRACE_LEAVE();

+  return rc;

+}

diff --git a/src/msg/msgd/mqd_ntf.h b/src/msg/msgd/mqd_ntf.h

new file mode 100644

index 0000000..4d9c0f1

--- /dev/null

+++ b/src/msg/msgd/mqd_ntf.h

@@ -0,0 +1,49 @@

+/*      -*- OpenSAF  -*-

+ *

+ * (C) Copyright 2017 The OpenSAF Foundation

+ *

+ * This program is distributed in the hope that it will be useful, but

+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY

+ * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed

+ * under the GNU Lesser General Public License Version 2.1, February 1999.

+ * The complete license can be accessed from the following location:

+ * 
https://urldefense.proofpoint.com/v2/url?u=http-3A__opensource.org_licenses_lgpl-2Dlicense.php&d=DwIF-g&c=RoP1YumCXCgaWHvlZYR8PZh8Bv7qIrMUB65eapI_JnE&r=rU6x356sikQZSi7Ttc2DuiqAgbc0QIeANg72N5AllVc&m=zb33_QGOVBVMiOqaTZ4TDnD1K3tft2hmzm4Zl_-Ak9E&s=zW1Da85bMtBUT-FnbCAff4Qx8AxM9-IibHRtx7PHWNs&e=

+ * See the Copying file included with the OpenSAF distribution for full

+ * licensing terms.

+ *

+ * Author(s): Genband

+ *

+ */

+

+/*****************************************************************************

+..............................................................................

+

+  FILE NAME: mqd_ntf.h

+

+..............................................................................

+

+  DESCRIPTION:

+

+  MQD Ntf Structures & Parameters.

+

+******************************************************************************/

+

+#ifndef MSG_MSGD_MQD_NTF_H_

+#define MSG_MSGD_MQD_NTF_H_

+

+#include <saNtf.h>

+

+#ifdef __cplusplus

+extern "C" {

+#endif

+

+void mqdNtfNotificationCallback(SaNtfSubscriptionIdT subscriptionId,

+                               const SaNtfNotificationsT *notification);

+

+SaAisErrorT mqdInitNtfSubscriptions(SaNtfHandleT);

+

+#ifdef __cplusplus

+}

+#endif

+

+#endif

diff --git a/src/msg/msgnd/mqnd_db.h b/src/msg/msgnd/mqnd_db.h

index d04566e..fc7d926 100644

--- a/src/msg/msgnd/mqnd_db.h

+++ b/src/msg/msgnd/mqnd_db.h

@@ -74,6 +74,7 @@ typedef struct mqnd_queue_info {

   SaMsgQueueHandleT listenerHandle; /* Listener queue handle */

   SaNameT queueName;

   SaSizeT size[SA_MSG_MESSAGE_LOWEST_PRIORITY + 1]; /* Size of the queue */

+  SaMsgQueueThresholdsT thresholds;

   SaMsgQueueStatusT queueStatus;        /* Status info of the queue */

   SaMsgQueueSendingStateT sendingState; /* Sending state is removed from B.1.1,

                                            but used internally */

@@ -91,6 +92,8 @@ typedef struct mqnd_queue_info {

   uint32_t numberOfFullErrors[SA_MSG_MESSAGE_LOWEST_PRIORITY + 1];

   SaUint64T totalQueueSize;

   uint32_t shm_queue_index;

+  bool capacityReachedSent;

+  bool capacityAvailableSent;

 } MQND_QUEUE_INFO;



 typedef struct mqnd_qhndl_node {

@@ -127,6 +130,7 @@ typedef struct mqnd_queue_ckpt_info {

   SaMsgQueueHandleT listenerHandle; /* Listener queue handle */

   SaNameT queueName;

   SaSizeT size[SA_MSG_MESSAGE_LOWEST_PRIORITY + 1]; /* Size of the queue */

+  SaMsgQueueThresholdsT thresholds;

   SaMsgQueueCreationFlagsT creationFlags;

   SaTimeT creationTime; /* Queue Creation time */

   SaTimeT retentionTime;

@@ -145,6 +149,8 @@ typedef struct mqnd_queue_ckpt_info {

   QueueStatsShm; /* Structure to store queue stats in shared memory */

   MQND_TMR qtransfer_complete_tmr; /* Q Transfer Complete Timer */

   uint32_t valid; /* To verify whether the checkpoint info is valid or not */

+  bool capacityReachedSent;

+  bool capacityAvailableSent;

 } MQND_QUEUE_CKPT_INFO;



 typedef struct mqnd_shm_info {

diff --git a/src/msg/msgnd/mqnd_evt.c b/src/msg/msgnd/mqnd_evt.c

index 0c2a612..fcbe4d6 100644

--- a/src/msg/msgnd/mqnd_evt.c

+++ b/src/msg/msgnd/mqnd_evt.c

@@ -42,6 +42,8 @@ static uint32_t mqnd_evt_proc_update_stats_shm(MQND_CB *cb,

                                               MQSV_DSEND_EVT *evt);

 static uint32_t mqnd_evt_proc_cb_dump(void);

 static uint32_t mqnd_evt_proc_ret_time_set(MQND_CB *cb, MQSV_EVT *evt);

+static uint32_t mqnd_evt_proc_cap_set(MQND_CB *, MQSV_EVT *);

+static uint32_t mqnd_evt_proc_cap_get(MQND_CB *, MQSV_EVT *);

 static void mqnd_dump_queue_status(MQND_CB *cb, SaMsgQueueStatusT *queueStatus,

                                   uint32_t offset);

 static void mqnd_dump_timer_info(MQND_TMR tmr);

@@ -186,6 +188,12 @@ static uint32_t mqnd_proc_mqp_req_msg(MQND_CB *cb, 
MQSV_EVT *evt)

        case MQP_EVT_Q_RET_TIME_SET_REQ:

                rc = mqnd_evt_proc_ret_time_set(cb, evt);

                break;

+       case MQP_EVT_CAP_SET_REQ:

+               rc = mqnd_evt_proc_cap_set(cb, evt);

+               break;

+       case MQP_EVT_CAP_GET_REQ:

+               rc = mqnd_evt_proc_cap_get(cb, evt);

+               break;

        default:

                LOG_ER("%s:%u: unrecognized message type: %d", __FILE__,

                       __LINE__, evt->msg.mqp_req.type);

@@ -946,6 +954,191 @@ send_rsp:

 }



 /****************************************************************************

+ * Name          : sendStateChangeNotification

+ *

+ * Description   : Send state change notification for queues

+ *

+ * Arguments     : MQND_CB *cb - MQND CB pointer

+ *                 MQND_QUEUE_INFO *qInfo - queue info struct

+ *                 SaMsgMessageCapacityStatusT - status to send

+ *

+ * Return Values : None.

+ *

+ * Notes         : None.

+ *****************************************************************************/

+static void sendStateChangeNotification(MQND_CB *cb,

+                                       MQND_QUEUE_INFO *qInfo,

+                                       SaMsgMessageCapacityStatusT status)

+{

+       SaNtfHandleT ntfHandle = 0;

+

+       do {

+               SaVersionT ntfVersion = { 'A', 1, 1 };

+

+               /* do we need to send it? */

+               if ((status == SA_MSG_QUEUE_CAPACITY_REACHED &&

+                       qInfo->capacityReachedSent) ||

+                       (status == SA_MSG_QUEUE_CAPACITY_AVAILABLE &&

+                       qInfo->capacityAvailableSent)) {

+                       break;

+               }

+

+               SaAisErrorT rc = saNtfInitialize(&ntfHandle, 0, &ntfVersion);

+

+               if (rc != SA_AIS_OK) {

+                       LOG_ER("saNtfInitialize failed: %i", rc);

+                       break;

+               }

+

+               const SaNameT notifyingObject = {

+                       sizeof("safApp=safMsgService") - 1,

+                       "safApp=safMsgService"

+               };

+

+               SaNtfStateChangeNotificationT ntfStateChange = { 0 };

+

+               rc = saNtfStateChangeNotificationAllocate(

+                       ntfHandle,

+                       &ntfStateChange,

+                       1, /* numCorrelated Notifications */

+                       0, /* length addition text */

+                       0, /* num additional info */

+                       1, /* num of state changes */

+                       0); /* variable data size */

+

+               if (rc != SA_AIS_OK) {

+                       LOG_ER("saNtfStateChangeNotificationAllocate "

+                               "failed: %i", rc);

+                       break;

+               }

+

+               *ntfStateChange.notificationHeader.eventType =

+                       SA_NTF_OBJECT_STATE_CHANGE;

+

+               memcpy(ntfStateChange.notificationHeader.notificationObject,

+                               &qInfo->queueName,

+                               sizeof(SaNameT));

+

+               memcpy(ntfStateChange.notificationHeader.notifyingObject,

+                       &notifyingObject,

+                       sizeof(SaNameT));

+

+               ntfStateChange.notificationHeader.notificationClassId->vendorId 
=

+                       SA_NTF_VENDOR_ID_SAF;

+               ntfStateChange.notificationHeader.notificationClassId->majorId =

+                       SA_SVC_MSG;

+               ntfStateChange.notificationHeader.notificationClassId->minorId =

+                       (status == SA_MSG_QUEUE_CAPACITY_REACHED) ?

+                       0x65 : 0x66;

+

+               *ntfStateChange.sourceIndicator = SA_NTF_OBJECT_OPERATION;

+

+               ntfStateChange.changedStates[0].stateId =

+                       SA_MSG_DEST_CAPACITY_STATUS;

+

+               ntfStateChange.changedStates[0].oldStatePresent =

+                       (status == SA_MSG_QUEUE_CAPACITY_REACHED) ?

+                       SA_FALSE : SA_TRUE;

+

+               if (status == SA_MSG_QUEUE_CAPACITY_AVAILABLE) {

+                       ntfStateChange.changedStates[0].oldState =

+                               SA_MSG_QUEUE_CAPACITY_REACHED;

+               }

+

+               ntfStateChange.changedStates[0].newState = status;

+

+               rc = saNtfNotificationSend(ntfStateChange.notificationHandle);

+

+               if (rc != SA_AIS_OK)

+                       LOG_ER("saNtfNotificationSend failed: %i", rc);

+               else {

+                       if (status == SA_MSG_QUEUE_CAPACITY_REACHED) {

+                               qInfo->capacityReachedSent = true;

+                               qInfo->capacityAvailableSent = false;

+                       } else {

+                               qInfo->capacityReachedSent = false;

+                               qInfo->capacityAvailableSent = true;

+                       }

+               }

+

+               rc = saNtfNotificationFree(ntfStateChange.notificationHandle);

+

+               if (rc != SA_AIS_OK)

+                       LOG_ER("saNtfNotificationFree failed: %i", rc);

+       } while (false);

+

+       if (ntfHandle) {

+               SaAisErrorT rc = saNtfFinalize(ntfHandle);

+

+               if (rc != SA_AIS_OK)

+                       LOG_ER("saNtfFinalize failed: %i", rc);

+       }

+}

+

+/****************************************************************************

+ * Name          : checkCapacity

+ *

+ * Description   : Function to check whether we need to send state change

+ *                 notification.

+ *

+ * Arguments     : MQND_CB *cb - MQND CB pointer

+ *                 MQND_QUEUE_INFO *qInfo - queue info

+ *

+ * Return Values : None.

+ *

+ * Notes         : None.

+ *****************************************************************************/

+static void checkCapacity(MQND_CB *cb, MQND_QUEUE_INFO *qInfo)

+{

+       int i;

+       uint32_t offset = qInfo->shm_queue_index;

+       MQND_QUEUE_CKPT_INFO *shm_base_addr = cb->mqnd_shm.shm_base_addr;

+       bool sendNotification = true;

+       SaMsgMessageCapacityStatusT capacityType =

+               SA_MSG_QUEUE_CAPACITY_REACHED;

+

+       TRACE_ENTER();

+

+       /* send capacity notifications */

+       for (i = SA_MSG_MESSAGE_HIGHEST_PRIORITY;

+               i <= SA_MSG_MESSAGE_LOWEST_PRIORITY;

+               i++)

+       {

+               TRACE("capacityReached[%i]: %llu capacityAvailable: %llu "

+                               "queueUsed: %llu",

+                               i,

+                               qInfo->thresholds.capacityReached[i],

+                               qInfo->thresholds.capacityAvailable[i],

+                               shm_base_addr[offset].QueueStatsShm.

+                                       saMsgQueueUsage[i].queueUsed);

+               if (!qInfo->capacityReachedSent &&

+                       qInfo->thresholds.capacityReached[i] >=

+                               shm_base_addr[offset].QueueStatsShm.

+                               saMsgQueueUsage[i].queueUsed) {

+                       /* only send notification if all are full */

+                       sendNotification = false;

+                       TRACE("not sending notification");

+                       break;

+               } else if (!qInfo->capacityAvailableSent &&

+                               qInfo->thresholds.capacityAvailable[i] >

+                                       shm_base_addr[offset].QueueStatsShm.

+                                       saMsgQueueUsage[i].queueUsed) {

+                       /* send notification if at least one comes available */

+                       TRACE("sending available notification");

+                       sendNotification = true;

+                       capacityType = SA_MSG_QUEUE_CAPACITY_AVAILABLE;

+                       break;

+               }

+       }

+

+       TRACE("sendNotification: %i", sendNotification);

+       if (sendNotification)

+               sendStateChangeNotification(cb, qInfo, capacityType);

+

+       TRACE_LEAVE();

+}

+

+/****************************************************************************

  * Name          : mqnd_evt_proc_update_stats_shm

  *

  * Description   : Function to update stats of queue in shm when message is

@@ -998,7 +1191,10 @@ static uint32_t mqnd_evt_proc_update_stats_shm(MQND_CB 
*cb, MQSV_DSEND_EVT *evt)



        statsReq = &evt->info.statsReq;



-       if (cb->is_restart_done)

+       if (!cb->clm_node_joined) {

+               err = SA_AIS_ERR_UNAVAILABLE;

+               goto done;

+       } else if (cb->is_restart_done)

                err = SA_AIS_OK;

        else {

                err = SA_AIS_ERR_TRY_AGAIN;

@@ -1039,6 +1235,8 @@ static uint32_t mqnd_evt_proc_update_stats_shm(MQND_CB 
*cb, MQSV_DSEND_EVT *evt)

                goto done;

        }



+       checkCapacity(cb, &qnode->qinfo);

+

 done:



        direct_rsp_evt =

@@ -1292,6 +1490,9 @@ static uint32_t mqnd_evt_proc_send_msg(MQND_CB *cb, 
MQSV_DSEND_EVT *evt)



        mqnd_send_msg_update_stats_shm(cb, qnode, snd_msg->message.size,

                                       snd_msg->message.priority);

+

+       checkCapacity(cb, &qnode->qinfo);

+

 send_resp:

        /* If the error happens in SendReceive case while sending the message

           then return the response from here and don't wait for the reply

@@ -1741,6 +1942,162 @@ send_rsp:

 }



 /****************************************************************************

+ * Name          : mqnd_evt_proc_cap_set

+ *

+ * Description   : Function to set capacity thresholds of queue

+ *

+ * Arguments     :

+ *

+ * Return Values : NCSCC_RC_SUCCESS/Error.

+ *

+ * Notes         : None.

+ *****************************************************************************/

+static uint32_t mqnd_evt_proc_cap_set(MQND_CB *cb, MQSV_EVT *evt)

+{

+       SaAisErrorT err = SA_AIS_OK;

+       MQSV_EVT rsp_evt;

+       uint32_t rc = NCSCC_RC_SUCCESS;

+       TRACE_ENTER();

+

+       do {

+               MQND_QUEUE_NODE *qnode = NULL;

+               int i;

+

+               if (!cb->clm_node_joined) {

+                       err = SA_AIS_ERR_UNAVAILABLE;

+                       break;

+               } else if (cb->is_restart_done) {

+                       err = SA_AIS_OK;

+               } else {

+                       LOG_ER("%s:%u: ERR_TRY_AGAIN: MQND is not completely "

+                                       "Initialized",

+                                       __FILE__, __LINE__);

+                       err = SA_AIS_ERR_TRY_AGAIN;

+                       break;

+               }

+

+               mqnd_queue_node_get(cb,

+                               evt->msg.mqp_req.info.retTimeSetReq.queueHandle,

+                               &qnode);

+

+               /* If queue not found */

+               if (!qnode) {

+                       LOG_ER("ERR_BAD_HANDLE: Get queue node Failed");

+                       err = SA_AIS_ERR_BAD_HANDLE;

+                       break;

+               }

+

+               /*

+                * capacityAvailable and capacityReached have already been

+                * checked in the agent with regards to each other

+                */

+               for (i = SA_MSG_MESSAGE_HIGHEST_PRIORITY;

+                               i <= SA_MSG_MESSAGE_LOWEST_PRIORITY;

+                               i++)

+               {

+                       if (qnode->qinfo.size[i] <

+                               
evt->msg.mqp_req.info.capacity.thresholds.capacityReached[i]) {

+                               TRACE("size is less than capacity reached");

+                               err = SA_AIS_ERR_INVALID_PARAM;

+                               break;

+                       }

+               }

+

+               if (err != SA_AIS_OK)

+                       break;

+

+               qnode->qinfo.thresholds =

+                       evt->msg.mqp_req.info.capacity.thresholds;

+       } while (false);

+

+       /*Send the resp to MQA */

+       memset(&rsp_evt, 0, sizeof(MQSV_EVT));

+

+       rsp_evt.type = MQSV_EVT_MQP_RSP;

+       rsp_evt.msg.mqp_rsp.type = MQP_EVT_CAP_SET_RSP;

+       rsp_evt.msg.mqp_rsp.error = err;

+

+       rc = mqnd_mds_send_rsp(cb, &evt->sinfo, &rsp_evt);

+

+       if (rc != NCSCC_RC_SUCCESS)

+               LOG_ER(

+                   "Queue Capacity set :Mds Send Response Failed %" PRIx64,

+                   evt->sinfo.dest);

+       else

+               TRACE_1("Queue Capacity set: Mds Send Response Success");

+

+       TRACE_LEAVE2("Returned with return code %u", rc);

+       return rc;

+}

+

+/****************************************************************************

+ * Name          : mqnd_evt_proc_cap_get

+ *

+ * Description   : Function to get capacity thresholds of queue

+ *

+ * Arguments     :

+ *

+ * Return Values : NCSCC_RC_SUCCESS/Error.

+ *

+ * Notes         : None.

+ *****************************************************************************/

+static uint32_t mqnd_evt_proc_cap_get(MQND_CB *cb, MQSV_EVT *evt)

+{

+       SaAisErrorT err = SA_AIS_OK;

+       MQSV_EVT rsp_evt;

+       MQND_QUEUE_NODE *qnode = NULL;

+       uint32_t rc = NCSCC_RC_SUCCESS;

+

+       TRACE_ENTER();

+

+       do {

+               if (!cb->clm_node_joined) {

+                       err = SA_AIS_ERR_UNAVAILABLE;

+                       break;

+               } else if (cb->is_restart_done) {

+                       err = SA_AIS_OK;

+               } else {

+                       LOG_ER("%s:%u: ERR_TRY_AGAIN: MQND is not completely "

+                                       "Initialized",

+                                       __FILE__, __LINE__);

+                       err = SA_AIS_ERR_TRY_AGAIN;

+                       break;

+               }

+

+               mqnd_queue_node_get(cb,

+                               evt->msg.mqp_req.info.retTimeSetReq.queueHandle,

+                               &qnode);

+

+               /* If queue not found */

+               if (!qnode) {

+                       LOG_ER("ERR_BAD_HANDLE: Get queue node Failed");

+                       err = SA_AIS_ERR_BAD_HANDLE;

+                       break;

+               }

+       } while (false);

+

+       /*Send the resp to MQA */

+       memset(&rsp_evt, 0, sizeof(MQSV_EVT));

+

+       rsp_evt.type = MQSV_EVT_MQP_RSP;

+       rsp_evt.msg.mqp_rsp.type = MQP_EVT_CAP_GET_RSP;

+       rsp_evt.msg.mqp_rsp.error = err;

+       rsp_evt.msg.mqp_rsp.info.capacity.thresholds = qnode->qinfo.thresholds;

+

+       rc = mqnd_mds_send_rsp(cb, &evt->sinfo, &rsp_evt);

+

+       if (rc != NCSCC_RC_SUCCESS)

+               LOG_ER(

+                   "Queue Capacity get :Mds Send Response Failed %" PRIx64,

+                   evt->sinfo.dest);

+       else

+               TRACE_1("Queue Capacity get: Mds Send Response Success");

+

+       TRACE_LEAVE2("Returned with return code %u", rc);

+       return rc;

+}

+

+/****************************************************************************

  * Name          : mqnd_evt_proc_cb_dump

  *

  * Description   : Function to print the control block infomration

diff --git a/src/msg/msgnd/mqnd_imm.c b/src/msg/msgnd/mqnd_imm.c

index 349790d..feb4aa4 100644

--- a/src/msg/msgnd/mqnd_imm.c

+++ b/src/msg/msgnd/mqnd_imm.c

@@ -460,7 +460,6 @@ SaAisErrorT 
mqnd_create_runtime_MsgQPriorityobject(SaStringT rname,

                                                   SaImmOiHandleT immOiHandle)

 {

        SaAisErrorT rc = SA_AIS_OK;

-       SaUint64T def_val = 0;

        int i = 0;

        SaImmAttrValueT arr1[1], arr2[1], arr3[1], arr4[1];

        SaImmAttrValuesT_2 attr_mqprio, attr_mqprioSize, attr_capavail,

@@ -484,8 +483,8 @@ SaAisErrorT 
mqnd_create_runtime_MsgQPriorityobject(SaStringT rname,



                arr1[0] = &mqprdn;

                arr2[0] = &(qnode->qinfo.size[i]);

-               arr3[0] = &def_val; /* not implemented */

-               arr4[0] = &def_val; /* not implemented */

+               arr3[0] = &qnode->qinfo.thresholds.capacityAvailable[i];

+               arr4[0] = &qnode->qinfo.thresholds.capacityReached[i];



                attr_mqprio.attrName = "safMqPrio";

                attr_mqprio.attrValueType = SA_IMM_ATTR_SASTRINGT;

diff --git a/src/msg/msgnd/mqnd_util.c b/src/msg/msgnd/mqnd_util.c

index 8dd79fb..10c78f5 100644

--- a/src/msg/msgnd/mqnd_util.c

+++ b/src/msg/msgnd/mqnd_util.c

@@ -79,6 +79,10 @@ uint32_t mqnd_queue_create(MQND_CB *cb, MQP_OPEN_REQ *open, 
MDS_DEST *rcvr_mqa,

             i <= SA_MSG_MESSAGE_LOWEST_PRIORITY; i++) {

                qnode->qinfo.totalQueueSize += open->creationAttributes.size[i];

                qnode->qinfo.size[i] = open->creationAttributes.size[i];

+               qnode->qinfo.thresholds.capacityReached[i] =

+                       qnode->qinfo.size[i];

+               qnode->qinfo.thresholds.capacityAvailable[i] =

+                       qnode->qinfo.size[i];

                qnode->qinfo.queueStatus.saMsgQueueUsage[i].queueSize =

                    open->creationAttributes.size[i];

        }

--

2.9.5
------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, Slashdot.org! http://sdm.link/slashdot
_______________________________________________
Opensaf-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to