This commit implements Sections 3.4.12 and 6.2.2 of the B.03.01 MSG spec.
---
 src/msg/Makefile.am                        |    8 +-
 src/msg/agent/mqa_api.c                    |  330 +++++++++
 src/msg/apitest/test_CapacityThresholds.cc | 1049 ++++++++++++++++++++++++++++
 src/msg/apitest/test_ErrUnavailable.cc     |  110 ++-
 src/msg/common/mqsv_edu.c                  |   74 +-
 src/msg/common/mqsv_evt.h                  |   15 +-
 src/msg/msgd/mqd_api.c                     |   64 +-
 src/msg/msgd/mqd_db.h                      |    4 +
 src/msg/msgd/mqd_ntf.cc                    |  310 ++++++++
 src/msg/msgd/mqd_ntf.h                     |   49 ++
 src/msg/msgnd/mqnd_db.h                    |    6 +
 src/msg/msgnd/mqnd_evt.c                   |  359 +++++++++-
 src/msg/msgnd/mqnd_imm.c                   |    5 +-
 src/msg/msgnd/mqnd_util.c                  |    4 +
 14 files changed, 2374 insertions(+), 13 deletions(-)
 create mode 100644 src/msg/apitest/test_CapacityThresholds.cc
 create mode 100644 src/msg/msgd/mqd_ntf.cc
 create mode 100644 src/msg/msgd/mqd_ntf.h

diff --git a/src/msg/Makefile.am b/src/msg/Makefile.am
index aeec6ca..c308072 100644
--- a/src/msg/Makefile.am
+++ b/src/msg/Makefile.am
@@ -97,6 +97,7 @@ noinst_HEADERS += \
        src/msg/msgd/mqd_dl_api.h \
        src/msg/msgd/mqd_imm.h \
        src/msg/msgd/mqd_mem.h \
+       src/msg/msgd/mqd_ntf.h \
        src/msg/msgd/mqd_red.h \
        src/msg/msgd/mqd_saf.h \
        src/msg/msgd/mqd_tmr.h \
@@ -158,6 +159,7 @@ bin_osafmsgnd_LDADD = \
        lib/libmsg_common.la \
        lib/libSaAmf.la \
        lib/libSaClm.la \
+       lib/libSaNtf.la \
        lib/libosaf_common.la \
        lib/libSaImmOi.la \
        lib/libSaImmOm.la \
@@ -182,7 +184,8 @@ bin_osafmsgd_SOURCES = \
        src/msg/msgd/mqd_mds.c \
        src/msg/msgd/mqd_red.c \
        src/msg/msgd/mqd_saf.c \
-       src/msg/msgd/mqd_tmr.c
+       src/msg/msgd/mqd_tmr.c \
+       src/msg/msgd/mqd_ntf.cc
 
 bin_osafmsgd_LDADD = \
        lib/libmsg_common.la \
@@ -191,6 +194,7 @@ bin_osafmsgd_LDADD = \
        lib/libosaf_common.la \
        lib/libSaImmOi.la \
        lib/libSaImmOm.la \
+       lib/libSaNtf.la \
        lib/libopensaf_core.la
 
 if ENABLE_TESTS
@@ -205,6 +209,7 @@ noinst_HEADERS += \
 bin_msgtest_SOURCES = \
   src/msg/apitest/msgtest.c \
   src/msg/apitest/test_saMsgVersionT.cc \
+  src/msg/apitest/test_CapacityThresholds.cc \
   src/msg/apitest/test_ErrUnavailable.cc \
   src/msg/apitest/tet_mqa_conf.c \
   src/msg/apitest/tet_mqsv_util.c \
@@ -212,6 +217,7 @@ bin_msgtest_SOURCES = \
 
 bin_msgtest_LDADD = \
   lib/libSaMsg.la \
+  lib/libSaNtf.la \
   lib/libopensaf_core.la \
   lib/libapitest.la
 
diff --git a/src/msg/agent/mqa_api.c b/src/msg/agent/mqa_api.c
index eeec00e..0070aaf 100644
--- a/src/msg/agent/mqa_api.c
+++ b/src/msg/agent/mqa_api.c
@@ -5600,6 +5600,336 @@ done:
        return rc;
 }
 
+/****************************************************************************
+  Name          : saMsgQueueCapacityThresholdsSet
+
+  Description   : This routine sets the critical capacity thresholds for the
+                 queue.
+
+  Arguments     : SaMsgQueueHandleT queueHandle
+               : const SaMsgQueueThresholdsT *thresholds
+
+  Return Values : SaAisErrorT
+
+  Notes         : None
+******************************************************************************/
+SaAisErrorT saMsgQueueCapacityThresholdsSet(SaMsgQueueHandleT queueHandle,
+               const SaMsgQueueThresholdsT *thresholds)
+{
+       SaAisErrorT rc = SA_AIS_OK;
+       MQA_CB *mqa_cb;
+       bool locked = false;
+
+       do {
+               MQSV_EVT cap_evt;
+               MQSV_EVT *out_evt = 0;
+               MQA_QUEUE_INFO *queue_node;
+               uint8_t mds_rc;
+               int i;
+
+               TRACE_ENTER2("SaMsgQueueHandle %llu ", queueHandle);
+
+               /* retrieve MQA CB */
+               mqa_cb = (MQA_CB *)m_MQSV_MQA_RETRIEVE_MQA_CB;
+               if (!mqa_cb) {
+                       TRACE_2("ERR_BAD_HANDLE: Control block retrieval "
+                                       "failed");
+                       rc = SA_AIS_ERR_BAD_HANDLE;
+                       break;
+               }
+
+               if (m_NCS_LOCK(&mqa_cb->cb_lock, NCS_LOCK_WRITE) !=
+                               NCSCC_RC_SUCCESS) {
+                       TRACE_4("ERR_LIBRARY: Lock failed for control block "
+                                       "write");
+                       rc = SA_AIS_ERR_LIBRARY;
+                       break;
+               }
+
+               locked = true;
+
+               /* Check if queueHandle is present in the tree */
+               if ((queue_node = mqa_queue_tree_find_and_add(
+                       mqa_cb, queueHandle, false, NULL, 0)) == NULL) {
+                       TRACE_2("ERR_BAD_HANDLE: Queue Database Find Failed");
+                       rc = SA_AIS_ERR_BAD_HANDLE;
+                       break;
+               }
+
+               if (queue_node->client_info->version.majorVersion <
+                       MQA_MAJOR_VERSION) {
+                       TRACE_2("ERR_VERSION: client not B.03.01");
+                       rc = SA_AIS_ERR_VERSION;
+                       break;
+               }
+
+               if (queue_node->client_info->version.majorVersion ==
+                       MQA_MAJOR_VERSION) {
+                       if (!mqa_cb->clm_node_joined ||
+                               queue_node->client_info->isStale) {
+                               TRACE_2("ERR_UNAVAILABLE: node is not cluster "
+                                       "member");
+                               rc = SA_AIS_ERR_UNAVAILABLE;
+                               break;
+                       }
+               }
+
+               if (!thresholds) {
+                       TRACE_2("ERR_INVALID_PARAM: thresholds is 0");
+                       rc = SA_AIS_ERR_INVALID_PARAM;
+                       break;
+               }
+
+               for (i = SA_MSG_MESSAGE_HIGHEST_PRIORITY;
+                               i <= SA_MSG_MESSAGE_LOWEST_PRIORITY;
+                               i++) {
+                       if (thresholds->capacityAvailable[i] >
+                                       thresholds->capacityReached[i]) {
+                               TRACE_2("ERR_INVALID_PARAM: Available greater "
+                                               "than Reached");
+                               rc = SA_AIS_ERR_INVALID_PARAM;
+                               break;
+                       }
+               }
+
+               if (rc != SA_AIS_OK)
+                       break;
+
+               /* Check if mqnd is up */
+               if (!mqa_cb->is_mqnd_up) {
+                       TRACE_2("ERR_TRY_AGAIN: MQND is down");
+                       rc = SA_AIS_ERR_TRY_AGAIN;
+                       break;
+               }
+
+               /* populate the structure */
+               memset(&cap_evt, 0, sizeof(MQSV_EVT));
+               cap_evt.type = MQSV_EVT_MQP_REQ;
+               cap_evt.msg.mqp_req.type = MQP_EVT_CAP_SET_REQ;
+               cap_evt.msg.mqp_req.info.capacity.queueHandle = queueHandle;;
+               cap_evt.msg.mqp_req.info.capacity.thresholds = *thresholds;
+               cap_evt.msg.mqp_req.agent_mds_dest = mqa_cb->mqa_mds_dest;
+
+               /* send the request to the MQND */
+               mds_rc = mqa_mds_msg_sync_send(mqa_cb->mqa_mds_hdl,
+                                       &mqa_cb->mqnd_mds_dest,
+                                       &cap_evt,
+                                       &out_evt,
+                                       MQSV_WAIT_TIME);
+
+               switch (mds_rc) {
+                       case NCSCC_RC_SUCCESS:
+                               break;
+
+                       case NCSCC_RC_REQ_TIMOUT:
+                               TRACE_2("ERR_TIMEOUT: Message Send through MDS "
+                                               "Timeout %" PRIx64,
+                                       mqa_cb->mqa_mds_dest);
+                               rc = SA_AIS_ERR_TIMEOUT;
+                               break;
+
+                       case NCSCC_RC_FAILURE:
+                               TRACE_2("ERR_TRY_AGAIN: Message Send through "
+                                               "MDS Failure %" PRIx64,
+                                       mqa_cb->mqa_mds_dest);
+                               rc = SA_AIS_ERR_TRY_AGAIN;
+                               break;
+
+                       default:
+                               TRACE_4("ERR_RESOURCES: Message Send through "
+                                               "MDS Failure %" PRIx64,
+                                       mqa_cb->mqa_mds_dest);
+                               rc = SA_AIS_ERR_NO_RESOURCES;
+                               break;
+               }
+
+               if (mds_rc != NCSCC_RC_SUCCESS)
+                       break;
+
+               if (out_evt) {
+                       rc = out_evt->msg.mqp_rsp.error;
+                       m_MMGR_FREE_MQA_EVT(out_evt);
+               } else {
+                       TRACE_4("ERR_RESOURCES: Response not received from "
+                                       "MQND");
+                       rc = SA_AIS_ERR_NO_RESOURCES;
+               }
+       } while (false);
+
+       if (locked)
+               m_NCS_UNLOCK(&mqa_cb->cb_lock, NCS_LOCK_WRITE);
+
+       if (mqa_cb)
+               m_MQSV_MQA_GIVEUP_MQA_CB;
+
+       if (rc == SA_AIS_OK) {
+               TRACE_LEAVE2(" Success ");
+       } else {
+               TRACE_LEAVE2(" Failed with return code - %d ", rc);
+       }
+
+       return rc;
+}
+
+/****************************************************************************
+  Name          : saMsgQueueCapacityThresholdsGet
+
+  Description   : This routine gets the critical capacity thresholds for the
+                 queue.
+
+  Arguments     : SaMsgQueueHandleT queueHandle
+               : SaMsgQueueThresholdsT *thresholds
+
+  Return Values : SaAisErrorT
+
+  Notes         : None
+******************************************************************************/
+SaAisErrorT saMsgQueueCapacityThresholdsGet(SaMsgQueueHandleT queueHandle,
+               SaMsgQueueThresholdsT *thresholds)
+{
+       SaAisErrorT rc = SA_AIS_OK;
+       MQA_CB *mqa_cb;
+       bool locked = false;
+
+       do {
+               MQSV_EVT cap_evt;
+               MQSV_EVT *out_evt = 0;
+               MQA_QUEUE_INFO *queue_node;
+               uint8_t mds_rc;
+
+               TRACE_ENTER2("SaMsgQueueHandle %llu ", queueHandle);
+
+               /* retrieve MQA CB */
+               mqa_cb = (MQA_CB *)m_MQSV_MQA_RETRIEVE_MQA_CB;
+               if (!mqa_cb) {
+                       TRACE_2("ERR_BAD_HANDLE: Control block retrieval "
+                                       "failed");
+                       rc = SA_AIS_ERR_BAD_HANDLE;
+                       break;
+               }
+
+               if (m_NCS_LOCK(&mqa_cb->cb_lock, NCS_LOCK_WRITE) !=
+                               NCSCC_RC_SUCCESS) {
+                       TRACE_4("ERR_LIBRARY: Lock failed for control block "
+                                       "write");
+                       rc = SA_AIS_ERR_LIBRARY;
+                       break;
+               }
+
+               locked = true;
+
+               /* Check if queueHandle is present in the tree */
+               if ((queue_node = mqa_queue_tree_find_and_add(
+                       mqa_cb, queueHandle, false, NULL, 0)) == NULL) {
+                       TRACE_2("ERR_BAD_HANDLE: Queue Database Find Failed");
+                       rc = SA_AIS_ERR_BAD_HANDLE;
+                       break;
+               }
+
+               if (queue_node->client_info->version.majorVersion <
+                       MQA_MAJOR_VERSION) {
+                       TRACE_2("ERR_VERSION: client not B.03.01");
+                       rc = SA_AIS_ERR_VERSION;
+                       break;
+               }
+
+               if (queue_node->client_info->version.majorVersion ==
+                       MQA_MAJOR_VERSION) {
+                       if (!mqa_cb->clm_node_joined ||
+                               queue_node->client_info->isStale) {
+                               TRACE_2("ERR_UNAVAILABLE: node is not cluster "
+                                       "member");
+                               rc = SA_AIS_ERR_UNAVAILABLE;
+                               break;
+                       }
+               }
+
+               if (!thresholds) {
+                       TRACE_2("ERR_INVALID_PARAM: thresholds is 0");
+                       rc = SA_AIS_ERR_INVALID_PARAM;
+                       break;
+               }
+
+               /* Check if mqnd is up */
+               if (!mqa_cb->is_mqnd_up) {
+                       TRACE_2("ERR_TRY_AGAIN: MQD or MQND is down");
+                       rc = SA_AIS_ERR_TRY_AGAIN;
+                       break;
+               }
+
+               /* populate the structure */
+               memset(&cap_evt, 0, sizeof(MQSV_EVT));
+               cap_evt.type = MQSV_EVT_MQP_REQ;
+               cap_evt.msg.mqp_req.type = MQP_EVT_CAP_GET_REQ;
+               cap_evt.msg.mqp_req.info.capacity.queueHandle = queueHandle;;
+               cap_evt.msg.mqp_req.agent_mds_dest = mqa_cb->mqa_mds_dest;
+
+               /* send the request to the MQND */
+               mds_rc = mqa_mds_msg_sync_send(mqa_cb->mqa_mds_hdl,
+                                       &mqa_cb->mqnd_mds_dest,
+                                       &cap_evt,
+                                       &out_evt,
+                                       MQSV_WAIT_TIME);
+
+               switch (mds_rc) {
+                       case NCSCC_RC_SUCCESS:
+                               break;
+
+                       case NCSCC_RC_REQ_TIMOUT:
+                               TRACE_2("ERR_TIMEOUT: Message Send through MDS "
+                                               "Timeout %" PRIx64,
+                                       mqa_cb->mqa_mds_dest);
+                               rc = SA_AIS_ERR_TIMEOUT;
+                               break;
+
+                       case NCSCC_RC_FAILURE:
+                               TRACE_2("ERR_TRY_AGAIN: Message Send through "
+                                               "MDS Failure %" PRIx64,
+                                       mqa_cb->mqa_mds_dest);
+                               rc = SA_AIS_ERR_TRY_AGAIN;
+                               break;
+
+                       default:
+                               TRACE_4("ERR_RESOURCES: Message Send through "
+                                               "MDS Failure %" PRIx64,
+                                       mqa_cb->mqa_mds_dest);
+                               rc = SA_AIS_ERR_NO_RESOURCES;
+                               break;
+               }
+
+               if (mds_rc != NCSCC_RC_SUCCESS)
+                       break;
+
+               if (out_evt) {
+                       rc = out_evt->msg.mqp_rsp.error;
+
+                       if (rc == SA_AIS_OK) {
+                               *thresholds = out_evt->msg.mqp_rsp.info.
+                                       capacity.thresholds;
+                       }
+
+                       m_MMGR_FREE_MQA_EVT(out_evt);
+               } else {
+                       TRACE_4("ERR_RESOURCES: Response not received from "
+                                       "MQND");
+                       rc = SA_AIS_ERR_NO_RESOURCES;
+               }
+       } while (false);
+
+       if (locked)
+               m_NCS_UNLOCK(&mqa_cb->cb_lock, NCS_LOCK_WRITE);
+
+       if (mqa_cb)
+               m_MQSV_MQA_GIVEUP_MQA_CB;
+
+       if (rc == SA_AIS_OK) {
+               TRACE_LEAVE2(" Success ");
+       } else {
+               TRACE_LEAVE2(" Failed with return code - %d ", rc);
+       }
+
+       return rc;
+}
 /*****************************************************************************
   Name          : msgget_timer_expired
 
diff --git a/src/msg/apitest/test_CapacityThresholds.cc 
b/src/msg/apitest/test_CapacityThresholds.cc
new file mode 100644
index 0000000..38d2144
--- /dev/null
+++ b/src/msg/apitest/test_CapacityThresholds.cc
@@ -0,0 +1,1049 @@
+#include <cassert>
+#include <condition_variable>
+#include <cstdio>
+#include <cstring>
+#include <iostream>
+#include <mutex>
+#include <queue>
+#include <thread>
+#include <sys/poll.h>
+#include "msg/apitest/msgtest.h"
+#include "msg/apitest/tet_mqsv.h"
+#include <saMsg.h>
+#include <saNtf.h>
+
+static SaVersionT msg3_1 = {'B', 3, 0};
+
+typedef std::queue<SaMsgMessageCapacityStatusT> TestQueue;
+typedef std::queue<int> ResultQueue;
+static TestQueue testQueue;
+static ResultQueue resultQueue;
+static std::mutex m;
+static std::condition_variable cv;
+static bool ready(false);
+static bool pollDone(false);
+
+static const SaNameT queueName = {
+  sizeof("safMq=TestQueue") - 1,
+  "safMq=TestQueue"
+};
+
+static const SaNameT queueGroupName = {
+  sizeof("safMqg=TestQueueGroup") - 1,
+  "safMqg=TestQueueGroup"
+};
+
+static const SaMsgQueueCreationAttributesT creationAttributes = {
+  0,
+  { 5, 5, 5, 5 },
+  SA_TIME_ONE_SECOND
+};
+
+static SaMsgQueueHandleT openQueue(SaMsgHandleT msgHandle) {
+  SaMsgQueueHandleT queueHandle;
+
+  SaAisErrorT rc(saMsgQueueOpen(msgHandle,
+                                &queueName,
+                                &creationAttributes,
+                                SA_MSG_QUEUE_CREATE,
+                                SA_TIME_MAX,
+                                &queueHandle));
+  assert(rc == SA_AIS_OK);
+
+  return queueHandle;
+}
+
+
+static void ntfCallback(SaNtfSubscriptionIdT subscriptionId,
+                        const SaNtfNotificationsT *n) {
+  int asyncTestResult(TET_FAIL);
+  int asyncTest(testQueue.front());
+  testQueue.pop();
+
+  do {
+    if (n->notificationType != SA_NTF_TYPE_STATE_CHANGE) {
+      break;
+    }
+
+    if 
(n->notification.stateChangeNotification.notificationHeader.notificationClassId->vendorId
 != SA_NTF_VENDOR_ID_SAF) {
+      break;
+    }
+
+    if 
(n->notification.stateChangeNotification.notificationHeader.notificationClassId->majorId
 != SA_SVC_MSG) {
+      break;
+    }
+
+    if (n->notification.stateChangeNotification.numStateChanges != 1) {
+      break;
+    }
+
+    if (*n->notification.stateChangeNotification.sourceIndicator != 
SA_NTF_OBJECT_OPERATION) {
+      break;
+    }
+
+    if (n->notification.stateChangeNotification.changedStates[0].stateId != 
SA_MSG_DEST_CAPACITY_STATUS) {
+      break;
+    }
+
+    if 
(n->notification.stateChangeNotification.notificationHeader.notifyingObject->length
 != sizeof("safApp=safMsgService") - 1) {
+      break;
+    }
+
+    if 
(memcmp(n->notification.stateChangeNotification.notificationHeader.notifyingObject->value,
+               "safApp=safMsgService",
+               
n->notification.stateChangeNotification.notificationHeader.notifyingObject->length))
 {
+      break;
+    }
+
+    if (asyncTest == SA_MSG_QUEUE_CAPACITY_REACHED) {
+      if 
(n->notification.stateChangeNotification.notificationHeader.notificationClassId->minorId
 != 0x65) {
+        break;
+      }
+
+      if (n->notification.stateChangeNotification.changedStates[0].newState != 
SA_MSG_QUEUE_CAPACITY_REACHED) {
+        break;
+      }
+
+      if 
(n->notification.stateChangeNotification.changedStates[0].oldStatePresent != 
SA_FALSE) {
+        break;
+      }
+
+      asyncTestResult = TET_PASS;
+    } else if (asyncTest == SA_MSG_QUEUE_CAPACITY_AVAILABLE) {
+      if 
(n->notification.stateChangeNotification.notificationHeader.notificationClassId->minorId
 != 0x66) {
+        break;
+      }
+
+      if (n->notification.stateChangeNotification.changedStates[0].newState != 
SA_MSG_QUEUE_CAPACITY_AVAILABLE) {
+        break;
+      }
+
+      if 
(n->notification.stateChangeNotification.changedStates[0].oldStatePresent != 
SA_TRUE) {
+        break;
+      }
+
+      if (n->notification.stateChangeNotification.changedStates[0].oldState != 
SA_MSG_QUEUE_CAPACITY_REACHED) {
+        break;
+      }
+
+      asyncTestResult = TET_PASS;
+    } else if (asyncTest == SA_MSG_QUEUE_GROUP_CAPACITY_REACHED) {
+      if 
(n->notification.stateChangeNotification.notificationHeader.notificationClassId->minorId
 != 0x67) {
+        break;
+      }
+
+      if (n->notification.stateChangeNotification.changedStates[0].newState != 
SA_MSG_QUEUE_GROUP_CAPACITY_REACHED) {
+        break;
+      }
+
+      if 
(n->notification.stateChangeNotification.changedStates[0].oldStatePresent != 
SA_FALSE) {
+        break;
+      }
+
+      asyncTestResult = TET_PASS;
+    } else if (asyncTest == SA_MSG_QUEUE_GROUP_CAPACITY_AVAILABLE) {
+      if 
(n->notification.stateChangeNotification.notificationHeader.notificationClassId->minorId
 != 0x68) {
+        break;
+      }
+
+      if (n->notification.stateChangeNotification.changedStates[0].newState != 
SA_MSG_QUEUE_GROUP_CAPACITY_AVAILABLE) {
+        break;
+      }
+
+      if 
(n->notification.stateChangeNotification.changedStates[0].oldStatePresent != 
SA_TRUE) {
+        break;
+      }
+
+      if (n->notification.stateChangeNotification.changedStates[0].oldState != 
SA_MSG_QUEUE_GROUP_CAPACITY_REACHED) {
+        break;
+      }
+
+      asyncTestResult = TET_PASS;
+    } else {
+      assert(false);
+      break;
+    }
+  } while (false);
+
+  resultQueue.push(asyncTestResult);
+
+  if (testQueue.empty()) {
+    {
+      std::lock_guard<std::mutex> lk(m);
+      ready = true;
+    }
+
+    cv.notify_one();
+  }
+}
+
+static void testNtfStateChange(SaNtfHandleT ntfHandle,
+                               SaSelectionObjectT ntfSelObj) {
+  pollfd fd = { static_cast<int>(ntfSelObj), POLLIN };
+
+  do {
+    if (pollDone)
+      break;
+
+    if (poll(&fd, 1, 1000) < 0) {
+      m_TET_MQSV_PRINTF("poll FAILED: %i\n", errno);
+      break;
+    }
+
+    if (fd.revents & POLLIN) {
+      SaAisErrorT rc(saNtfDispatch(ntfHandle, SA_DISPATCH_ONE));
+
+      if (rc != SA_AIS_OK) {
+        if (rc != SA_AIS_ERR_BAD_HANDLE)
+          std::cerr << "saNtfDispatch failed: " << rc << std::endl;
+        break;
+      }
+    }
+  } while (true);
+}
+
+static int testNtfInit(SaNtfHandleT& ntfHandle, SaSelectionObjectT& ntfSelObj) 
{
+  int result(TET_PASS);
+
+  do {
+    SaNtfCallbacksT callbacks = { ntfCallback, 0 };
+    SaVersionT version = { 'A', 1, 0 };
+    SaNtfStateChangeNotificationFilterT stateChangeFilter;
+
+    SaAisErrorT rc(saNtfInitialize(&ntfHandle, &callbacks, &version));
+    if (rc != SA_AIS_OK) {
+      result = TET_FAIL;
+      break;
+    }
+
+    rc = saNtfStateChangeNotificationFilterAllocate(
+      ntfHandle, &stateChangeFilter, 0, 0, 0, 0, 0, 0);
+
+    if (rc != SA_AIS_OK) {
+      result = TET_FAIL;
+      break;
+    }
+
+    SaNtfNotificationTypeFilterHandlesT notificationFilterHandles = {
+      0, 0, stateChangeFilter.notificationFilterHandle, 0, 0
+    };
+
+    rc = saNtfNotificationSubscribe(&notificationFilterHandles, 1);
+
+    if (rc != SA_AIS_OK) {
+      result = TET_FAIL;
+      break;
+    }
+
+    rc = 
saNtfNotificationFilterFree(stateChangeFilter.notificationFilterHandle);
+    if (rc != SA_AIS_OK) {
+      result = TET_FAIL;
+      break;
+    }
+
+    rc = saNtfSelectionObjectGet(ntfHandle, &ntfSelObj);
+    if (rc != SA_AIS_OK) {
+      result = TET_FAIL;
+      break;
+    }
+
+    pollDone = false;
+  } while (false);
+
+  return result;
+}
+
+static int testNtfCleanup(SaNtfHandleT ntfHandle) {
+  int result(TET_PASS);
+  SaAisErrorT rc(saNtfFinalize(ntfHandle));
+  if (rc != SA_AIS_OK)
+    result = TET_FAIL;
+
+  pollDone = true;
+
+  return result;
+}
+
+static void capacityThresholds_01(void) {
+  SaMsgQueueThresholdsT thresholds;
+  SaAisErrorT rc(saMsgQueueCapacityThresholdsSet(0xdeadbeef, &thresholds));
+  aisrc_validate(rc, SA_AIS_ERR_BAD_HANDLE);
+}
+
+static void capacityThresholds_02(void) {
+  SaMsgHandleT msgHandle;
+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+  assert(rc == SA_AIS_OK);
+
+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+  rc = saMsgFinalize(msgHandle);
+  assert(rc == SA_AIS_OK);
+
+  SaMsgQueueThresholdsT thresholds;
+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+  aisrc_validate(rc, SA_AIS_ERR_BAD_HANDLE);
+}
+
+static void capacityThresholds_03(void) {
+  SaMsgHandleT msgHandle;
+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+  assert(rc == SA_AIS_OK);
+
+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+  rc = saMsgQueueClose(queueHandle);
+  assert(rc == SA_AIS_OK);
+
+  SaMsgQueueThresholdsT thresholds;
+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+  aisrc_validate(rc, SA_AIS_ERR_BAD_HANDLE);
+
+  rc = saMsgFinalize(msgHandle);
+  assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_04(void) {
+  SaMsgHandleT msgHandle;
+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+  assert(rc == SA_AIS_OK);
+
+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, 0);
+  aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);
+
+  rc = saMsgFinalize(msgHandle);
+  assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_05(void) {
+  SaMsgHandleT msgHandle;
+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+  assert(rc == SA_AIS_OK);
+
+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+  const SaMsgQueueThresholdsT thresholds = {
+    { 4, 5, 5, 5 }, { 5, 5, 5, 5 }
+  };
+
+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+  aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);
+
+  rc = saMsgFinalize(msgHandle);
+  assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_06(void) {
+  SaMsgHandleT msgHandle;
+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+  assert(rc == SA_AIS_OK);
+
+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+  const SaMsgQueueThresholdsT thresholds = {
+    { 5, 4, 5, 5 }, { 5, 5, 5, 5 }
+  };
+
+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+  aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);
+
+  rc = saMsgFinalize(msgHandle);
+  assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_07(void) {
+  SaMsgHandleT msgHandle;
+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+  assert(rc == SA_AIS_OK);
+
+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+  const SaMsgQueueThresholdsT thresholds = {
+    { 5, 5, 4, 5 }, { 5, 5, 5, 5 }
+  };
+
+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+  aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);
+
+  rc = saMsgFinalize(msgHandle);
+  assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_08(void) {
+  SaMsgHandleT msgHandle;
+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+  assert(rc == SA_AIS_OK);
+
+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+  const SaMsgQueueThresholdsT thresholds = {
+    { 5, 5, 5, 4 }, { 5, 5, 5, 5 }
+  };
+
+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+  aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);
+
+  rc = saMsgFinalize(msgHandle);
+  assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_09(void) {
+  SaMsgHandleT msgHandle;
+  SaVersionT msg1_1 = {'B', 1, 0};
+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg1_1);
+  assert(rc == SA_AIS_OK);
+
+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+  const SaMsgQueueThresholdsT thresholds = {
+    { 5, 5, 5, 5 }, { 5, 5, 5, 5 }
+  };
+
+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+  aisrc_validate(rc, SA_AIS_ERR_VERSION);
+
+  rc = saMsgFinalize(msgHandle);
+  assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_10(void) {
+  SaMsgHandleT msgHandle;
+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+  assert(rc == SA_AIS_OK);
+
+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+  const SaMsgQueueThresholdsT thresholds = {
+    { 6, 5, 5, 5 }, { 5, 5, 5, 5 }
+  };
+
+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+  aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);
+
+  rc = saMsgFinalize(msgHandle);
+  assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_11(void) {
+  SaMsgHandleT msgHandle;
+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+  assert(rc == SA_AIS_OK);
+
+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+  const SaMsgQueueThresholdsT thresholds = {
+    { 5, 6, 5, 5 }, { 5, 5, 5, 5 }
+  };
+
+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+  aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);
+
+  rc = saMsgFinalize(msgHandle);
+  assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_12(void) {
+  SaMsgHandleT msgHandle;
+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+  assert(rc == SA_AIS_OK);
+
+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+  const SaMsgQueueThresholdsT thresholds = {
+    { 5, 5, 6, 5 }, { 5, 5, 5, 5 }
+  };
+
+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+  aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);
+
+  rc = saMsgFinalize(msgHandle);
+  assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_13(void) {
+  SaMsgHandleT msgHandle;
+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+  assert(rc == SA_AIS_OK);
+
+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+  const SaMsgQueueThresholdsT thresholds = {
+    { 5, 5, 5, 6 }, { 5, 5, 5, 5 }
+  };
+
+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+  aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);
+
+  rc = saMsgFinalize(msgHandle);
+  assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_14(void) {
+  SaMsgHandleT msgHandle;
+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+  assert(rc == SA_AIS_OK);
+
+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+  const SaMsgQueueThresholdsT thresholds = {
+    { 5, 5, 5, 5 }, { 5, 5, 5, 5 }
+  };
+
+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+  aisrc_validate(rc, SA_AIS_OK);
+
+  rc = saMsgFinalize(msgHandle);
+  assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_15(void) {
+  SaMsgQueueThresholdsT thresholds;
+  SaAisErrorT rc(saMsgQueueCapacityThresholdsGet(0xdeadbeef, &thresholds));
+  aisrc_validate(rc, SA_AIS_ERR_BAD_HANDLE);
+}
+
+static void capacityThresholds_16(void) {
+  SaMsgHandleT msgHandle;
+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+  assert(rc == SA_AIS_OK);
+
+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+  rc = saMsgFinalize(msgHandle);
+  assert(rc == SA_AIS_OK);
+
+  SaMsgQueueThresholdsT thresholds;
+  rc = saMsgQueueCapacityThresholdsGet(queueHandle, &thresholds);
+  aisrc_validate(rc, SA_AIS_ERR_BAD_HANDLE);
+}
+
+static void capacityThresholds_17(void) {
+  SaMsgHandleT msgHandle;
+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+  assert(rc == SA_AIS_OK);
+
+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+  rc = saMsgQueueClose(queueHandle);
+  assert(rc == SA_AIS_OK);
+
+  SaMsgQueueThresholdsT thresholds;
+  rc = saMsgQueueCapacityThresholdsGet(queueHandle, &thresholds);
+  aisrc_validate(rc, SA_AIS_ERR_BAD_HANDLE);
+
+  rc = saMsgFinalize(msgHandle);
+  assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_18(void) {
+  SaMsgHandleT msgHandle;
+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+  assert(rc == SA_AIS_OK);
+
+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+  rc = saMsgQueueCapacityThresholdsGet(queueHandle, 0);
+  aisrc_validate(rc, SA_AIS_ERR_INVALID_PARAM);
+
+  rc = saMsgFinalize(msgHandle);
+  assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_19(void) {
+  SaMsgHandleT msgHandle;
+  SaVersionT msg1_1 = {'B', 1, 0};
+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg1_1);
+  assert(rc == SA_AIS_OK);
+
+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+  SaMsgQueueThresholdsT thresholds;
+  rc = saMsgQueueCapacityThresholdsGet(queueHandle, &thresholds);
+  aisrc_validate(rc, SA_AIS_ERR_VERSION);
+
+  rc = saMsgFinalize(msgHandle);
+  assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_20(void) {
+  SaMsgHandleT msgHandle;
+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+  assert(rc == SA_AIS_OK);
+
+  SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+  const SaMsgQueueThresholdsT thresholds = {
+    { 1, 2, 3, 4 }, { 1, 2, 3, 4 }
+  };
+
+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+  assert(rc == SA_AIS_OK);
+
+  SaMsgQueueThresholdsT thresholdsGet;
+  rc = saMsgQueueCapacityThresholdsGet(queueHandle, &thresholdsGet);
+  aisrc_validate(rc, SA_AIS_OK);
+
+  assert(!memcmp(&thresholds, &thresholdsGet, sizeof(SaMsgQueueThresholdsT)));
+
+  rc = saMsgFinalize(msgHandle);
+  assert(rc == SA_AIS_OK);
+}
+
+static void capacityThresholds_21(void) {
+  int result(TET_PASS);
+
+  do {
+    SaMsgHandleT msgHandle;
+    SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+    assert(rc == SA_AIS_OK);
+
+    SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+    const SaMsgQueueThresholdsT thresholds = {
+      { 3, 3, 3, 3 }, { 2, 2, 2, 2 }
+    };
+
+    rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+    assert(rc == SA_AIS_OK);
+
+    SaNtfHandleT ntfHandle;
+    SaSelectionObjectT ntfSelObj;
+    result = testNtfInit(ntfHandle, ntfSelObj);
+    if (result != TET_PASS)
+      break;
+
+    ready = false;
+    std::thread t(&testNtfStateChange, ntfHandle, ntfSelObj);
+    testQueue.push(SA_MSG_QUEUE_CAPACITY_REACHED);
+
+    /* fill up the q */
+    for (SaUint8T i(SA_MSG_MESSAGE_HIGHEST_PRIORITY);
+         i <= SA_MSG_MESSAGE_LOWEST_PRIORITY;
+         i++) {
+      char data[] = "abcd";
+
+      SaMsgMessageT message = {
+        1, 1, sizeof(data) - 1, 0, data, i
+      };
+
+      rc = saMsgMessageSend(msgHandle, &queueName, &message, 
SA_TIME_ONE_SECOND);
+      assert(rc == SA_AIS_OK);
+    }
+
+    // wait for the notifications
+    {
+      std::unique_lock<std::mutex> lk(m);
+      cv.wait(lk, []{ return ready; });
+    }
+
+    result = resultQueue.front();
+    resultQueue.pop();
+    if (result != TET_PASS)
+      break;
+
+    result = testNtfCleanup(ntfHandle);
+    if (result != TET_PASS)
+      break;
+
+    rc = saMsgFinalize(msgHandle);
+    assert(rc == SA_AIS_OK);
+
+    t.join();
+  } while (false);
+
+  test_validate(result, TET_PASS);
+
+  // let the queue close
+  sleep(2);
+}
+
+static void capacityThresholds_22(void) {
+  int result(TET_PASS);
+
+  do {
+    SaMsgHandleT msgHandle;
+    SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+    assert(rc == SA_AIS_OK);
+
+    SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+    const SaMsgQueueThresholdsT thresholds = {
+      { 3, 3, 3, 3 }, { 2, 2, 2, 2 }
+    };
+
+    rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+    assert(rc == SA_AIS_OK);
+
+    SaNtfHandleT ntfHandle;
+    SaSelectionObjectT ntfSelObj;
+    result = testNtfInit(ntfHandle, ntfSelObj);
+    if (result != TET_PASS)
+      break;
+
+    ready = false;
+    std::thread t(&testNtfStateChange, ntfHandle, ntfSelObj);
+    testQueue.push(SA_MSG_QUEUE_CAPACITY_REACHED);
+
+    /* fill up the q */
+    for (SaUint8T i(SA_MSG_MESSAGE_HIGHEST_PRIORITY);
+         i <= SA_MSG_MESSAGE_LOWEST_PRIORITY;
+         i++) {
+      char data[] = "abcd";
+
+      SaMsgMessageT message = {
+        1, 1, sizeof(data) - 1, 0, data, i
+      };
+
+      rc = saMsgMessageSend(msgHandle, &queueName, &message, 
SA_TIME_ONE_SECOND);
+      assert(rc == SA_AIS_OK);
+    }
+
+    // wait for the notifications
+    {
+      std::unique_lock<std::mutex> lk(m);
+      cv.wait(lk, []{ return ready; });
+    }
+
+    result = resultQueue.front();
+    resultQueue.pop();
+    if (result != TET_PASS)
+      break;
+
+    ready = false;
+
+    testQueue.push(SA_MSG_QUEUE_CAPACITY_AVAILABLE);
+
+    /* read a message */
+    char data[4];
+    SaMsgMessageT msg;
+    SaMsgSenderIdT senderId;
+    memset(&msg, 0, sizeof(msg));
+    msg.data = &data;
+    msg.size = sizeof(data);
+
+    rc = saMsgMessageGet(queueHandle, &msg, 0, &senderId, SA_TIME_ONE_SECOND);
+    assert(rc == SA_AIS_OK);
+
+    // wait for the notifications
+    {
+      std::unique_lock<std::mutex> lk(m);
+      cv.wait(lk, []{ return ready; });
+    }
+
+    result = resultQueue.front();
+    resultQueue.pop();
+    if (result != TET_PASS)
+      break;
+
+    result = testNtfCleanup(ntfHandle);
+    if (result != TET_PASS)
+      break;
+
+    rc = saMsgFinalize(msgHandle);
+    assert(rc == SA_AIS_OK);
+
+    t.join();
+  } while (false);
+
+  test_validate(result, TET_PASS);
+
+  // let the queue close
+  sleep(2);
+}
+
+static void capacityThresholds_23(void) {
+  int result(TET_PASS);
+
+  do {
+    SaMsgHandleT msgHandle;
+    SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+    assert(rc == SA_AIS_OK);
+
+    SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+    const SaMsgQueueThresholdsT thresholds = {
+      { 3, 3, 3, 3 }, { 2, 2, 2, 2 }
+    };
+
+    rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+    assert(rc == SA_AIS_OK);
+
+    rc = saMsgQueueGroupCreate(msgHandle,
+                               &queueGroupName,
+                               SA_MSG_QUEUE_GROUP_ROUND_ROBIN);
+    assert(rc == SA_AIS_OK);
+
+    rc = saMsgQueueGroupInsert(msgHandle, &queueGroupName, &queueName);
+    assert(rc == SA_AIS_OK);
+
+    SaNtfHandleT ntfHandle;
+    SaSelectionObjectT ntfSelObj;
+    result = testNtfInit(ntfHandle, ntfSelObj);
+    if (result != TET_PASS)
+      break;
+
+    ready = false;
+    std::thread t(&testNtfStateChange, ntfHandle, ntfSelObj);
+    testQueue.push(SA_MSG_QUEUE_CAPACITY_REACHED);
+    testQueue.push(SA_MSG_QUEUE_GROUP_CAPACITY_REACHED);
+
+    /* fill up the q */
+    for (SaUint8T i(SA_MSG_MESSAGE_HIGHEST_PRIORITY);
+         i <= SA_MSG_MESSAGE_LOWEST_PRIORITY;
+         i++) {
+      char data[] = "abcd";
+
+      SaMsgMessageT message = {
+        1, 1, sizeof(data) - 1, 0, data, i
+      };
+
+      rc = saMsgMessageSend(msgHandle,
+                            &queueGroupName,
+                            &message,
+                            SA_TIME_ONE_SECOND);
+      assert(rc == SA_AIS_OK);
+    }
+
+    // wait for the notifications
+    {
+      std::unique_lock<std::mutex> lk(m);
+      cv.wait(lk, []{ return ready; });
+    }
+
+    ready = false;
+
+    result = resultQueue.front();
+    resultQueue.pop();
+    if (result != TET_PASS)
+      break;
+
+    result = resultQueue.front();
+    resultQueue.pop();
+    if (result != TET_PASS)
+      break;
+
+    result = testNtfCleanup(ntfHandle);
+    if (result != TET_PASS) {
+      break;
+    }
+
+    rc = saMsgQueueGroupDelete(msgHandle, &queueGroupName);
+    assert(rc == SA_AIS_OK);
+
+    rc = saMsgFinalize(msgHandle);
+    assert(rc == SA_AIS_OK);
+
+    t.join();
+  } while (false);
+
+  test_validate(result, TET_PASS);
+
+  // let the queue close
+  sleep(2);
+}
+
+static void capacityThresholds_24(void) {
+  int result(TET_PASS);
+
+  do {
+    SaMsgHandleT msgHandle;
+    SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+    assert(rc == SA_AIS_OK);
+
+    SaMsgQueueHandleT queueHandle(openQueue(msgHandle));
+
+    const SaMsgQueueThresholdsT thresholds = {
+      { 3, 3, 3, 3 }, { 2, 2, 2, 2 }
+    };
+
+    rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+    assert(rc == SA_AIS_OK);
+
+    rc = saMsgQueueGroupCreate(msgHandle,
+                               &queueGroupName,
+                               SA_MSG_QUEUE_GROUP_ROUND_ROBIN);
+    assert(rc == SA_AIS_OK);
+
+    rc = saMsgQueueGroupInsert(msgHandle, &queueGroupName, &queueName);
+    assert(rc == SA_AIS_OK);
+
+    SaNtfHandleT ntfHandle;
+    SaSelectionObjectT ntfSelObj;
+    result = testNtfInit(ntfHandle, ntfSelObj);
+    if (result != TET_PASS) {
+      break;
+    }
+
+    ready = false;
+    std::thread t(&testNtfStateChange, ntfHandle, ntfSelObj);
+    testQueue.push(SA_MSG_QUEUE_CAPACITY_REACHED);
+    testQueue.push(SA_MSG_QUEUE_GROUP_CAPACITY_REACHED);
+
+    /* fill up the q */
+    for (SaUint8T i(SA_MSG_MESSAGE_HIGHEST_PRIORITY);
+         i <= SA_MSG_MESSAGE_LOWEST_PRIORITY;
+         i++) {
+      char data[] = "abcd";
+
+      SaMsgMessageT message = {
+        1, 1, sizeof(data) - 1, 0, data, i
+      };
+
+      rc = saMsgMessageSend(msgHandle, &queueName, &message, 
SA_TIME_ONE_SECOND);
+      assert(rc == SA_AIS_OK);
+    }
+
+    // wait for the notifications
+    {
+      std::unique_lock<std::mutex> lk(m);
+      cv.wait(lk, []{ return ready; });
+    }
+
+    result = resultQueue.front();
+    resultQueue.pop();
+    if (result != TET_PASS)
+      break;
+
+    result = resultQueue.front();
+    resultQueue.pop();
+    if (result != TET_PASS)
+      break;
+
+    ready = false;
+
+    testQueue.push(SA_MSG_QUEUE_CAPACITY_AVAILABLE);
+    testQueue.push(SA_MSG_QUEUE_GROUP_CAPACITY_AVAILABLE);
+
+    /* read a message */
+    char data[4];
+    SaMsgMessageT msg;
+    SaMsgSenderIdT senderId;
+    memset(&msg, 0, sizeof(msg));
+    msg.data = &data;
+    msg.size = sizeof(data);
+
+    rc = saMsgMessageGet(queueHandle, &msg, 0, &senderId, SA_TIME_ONE_SECOND);
+    assert(rc == SA_AIS_OK);
+
+    {
+      std::unique_lock<std::mutex> lk(m);
+      cv.wait(lk, []{ return ready; });
+    }
+
+    result = resultQueue.front();
+    resultQueue.pop();
+    if (result != TET_PASS)
+      break;
+
+    result = resultQueue.front();
+    resultQueue.pop();
+    if (result != TET_PASS)
+      break;
+
+    result = testNtfCleanup(ntfHandle);
+    if (result != TET_PASS) {
+      break;
+    }
+
+    rc = saMsgQueueGroupDelete(msgHandle, &queueGroupName);
+    assert(rc == SA_AIS_OK);
+
+    rc = saMsgFinalize(msgHandle);
+    assert(rc == SA_AIS_OK);
+
+    t.join();
+  } while (false);
+
+  test_validate(result, TET_PASS);
+
+  // let the queue close
+  sleep(2);
+}
+
+__attribute__((constructor)) static void capacityThresholds_constructor(void) {
+  test_suite_add(27, "Capacity Thresholds Test Suite");
+  test_case_add(27,
+                capacityThresholds_01,
+                "saMsgQueueCapacityThresholdsSet returns BAD_HANDLE when "
+                "queueHandle is bad");
+  test_case_add(27,
+                capacityThresholds_02,
+                "saMsgQueueCapacityThresholdsSet with finalized handle");
+  test_case_add(27,
+                capacityThresholds_03,
+                "saMsgQueueCapacityThresholdsSet with closed queue");
+  test_case_add(27,
+                capacityThresholds_04,
+                "saMsgQueueCapacityThresholdsSet with null thresholds 
pointer");
+  test_case_add(27,
+                capacityThresholds_05,
+                "saMsgQueueCapacityThresholdsSet with capacityAvailable > "
+                "capacityReached (1)");
+  test_case_add(27,
+                capacityThresholds_06,
+                "saMsgQueueCapacityThresholdsSet with capacityAvailable > "
+                "capacityReached (2)");
+  test_case_add(27,
+                capacityThresholds_07,
+                "saMsgQueueCapacityThresholdsSet with capacityAvailable > "
+                "capacityReached (3)");
+  test_case_add(27,
+                capacityThresholds_08,
+                "saMsgQueueCapacityThresholdsSet with capacityAvailable > "
+                "capacityReached (4)");
+  test_case_add(27,
+                capacityThresholds_09,
+                "saMsgQueueCapacityThresholdsSet with version != B.03.01");
+  test_case_add(27,
+                capacityThresholds_10,
+                "saMsgQueueCapacityThresholdsSet with size < capacityReached "
+                "(1)");
+  test_case_add(27,
+                capacityThresholds_11,
+                "saMsgQueueCapacityThresholdsSet with size < capacityReached "
+                "(2)");
+  test_case_add(27,
+                capacityThresholds_12,
+                "saMsgQueueCapacityThresholdsSet with size < capacityReached "
+                "(3)");
+  test_case_add(27,
+                capacityThresholds_13,
+                "saMsgQueueCapacityThresholdsSet with size < capacityReached "
+                "(4)");
+  test_case_add(27,
+                capacityThresholds_14,
+                "saMsgQueueCapacityThresholdsSet success");
+  test_case_add(27,
+                capacityThresholds_15,
+                "saMsgQueueCapacityThresholdsGet returns BAD_HANDLE when "
+                "queueHandle is bad");
+  test_case_add(27,
+                capacityThresholds_16,
+                "saMsgQueueCapacityThresholdsGet with finalized handle");
+  test_case_add(27,
+                capacityThresholds_17,
+                "saMsgQueueCapacityThresholdsGet with closed queue");
+  test_case_add(27,
+                capacityThresholds_18,
+                "saMsgQueueCapacityThresholdsGet with null thresholds 
pointer");
+  test_case_add(27,
+                capacityThresholds_19,
+                "saMsgQueueCapacityThresholdsGet with version != B.03.01");
+  test_case_add(27,
+                capacityThresholds_20,
+                "saMsgQueueCapacityThresholdsGet success");
+  test_case_add(27,
+                capacityThresholds_21,
+                "Capacity Reached Notification sent");
+  test_case_add(27,
+                capacityThresholds_22,
+                "Capacity Available Notification sent");
+  test_case_add(27,
+                capacityThresholds_23,
+                "Capacity Reached Notification sent (group)");
+  test_case_add(27,
+                capacityThresholds_24,
+                "Capacity Available Notification sent (group)");
+}
diff --git a/src/msg/apitest/test_ErrUnavailable.cc 
b/src/msg/apitest/test_ErrUnavailable.cc
index 3f37f8d..dc1b371 100644
--- a/src/msg/apitest/test_ErrUnavailable.cc
+++ b/src/msg/apitest/test_ErrUnavailable.cc
@@ -504,6 +504,59 @@ static void saErrUnavailable_25(void)
   assert(rc == SA_AIS_OK);
 }
 
+static void saErrUnavailable_26(void)
+{
+  SaMsgHandleT msgHandle;
+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+  assert(rc == SA_AIS_OK);
+
+  SaMsgQueueHandleT queueHandle;
+  rc = saMsgQueueOpen(msgHandle,
+                      &queueName,
+                      &creationAttributes,
+                      SA_MSG_QUEUE_CREATE,
+                      SA_TIME_END,
+                      &queueHandle);
+  assert(rc == SA_AIS_OK);
+
+  lockUnlockNode(true);
+  const SaMsgQueueThresholdsT thresholds = {
+       { 5, 5, 5, 5 },
+       { 5, 5, 5, 5 }
+  };
+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+  test_validate(rc, SA_AIS_ERR_UNAVAILABLE);
+  lockUnlockNode(false);
+
+  rc = saMsgFinalize(msgHandle);
+  assert(rc == SA_AIS_OK);
+}
+
+static void saErrUnavailable_27(void)
+{
+  SaMsgHandleT msgHandle;
+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+  assert(rc == SA_AIS_OK);
+
+  SaMsgQueueHandleT queueHandle;
+  rc = saMsgQueueOpen(msgHandle,
+                      &queueName,
+                      &creationAttributes,
+                      SA_MSG_QUEUE_CREATE,
+                      SA_TIME_END,
+                      &queueHandle);
+  assert(rc == SA_AIS_OK);
+
+  lockUnlockNode(true);
+  SaMsgQueueThresholdsT thresholds;
+  rc = saMsgQueueCapacityThresholdsGet(queueHandle, &thresholds);
+  test_validate(rc, SA_AIS_ERR_UNAVAILABLE);
+  lockUnlockNode(false);
+
+  rc = saMsgFinalize(msgHandle);
+  assert(rc == SA_AIS_OK);
+}
+
 static void saErrUnavailable_30(void)
 {
   lockUnlockNode(true);
@@ -940,6 +993,59 @@ static void saErrUnavailable_54(void)
   assert(rc == SA_AIS_OK);
 }
 
+static void saErrUnavailable_55(void)
+{
+  SaMsgHandleT msgHandle;
+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+  assert(rc == SA_AIS_OK);
+
+  SaMsgQueueHandleT queueHandle;
+  rc = saMsgQueueOpen(msgHandle,
+                      &queueName,
+                      &creationAttributes,
+                      SA_MSG_QUEUE_CREATE,
+                      SA_TIME_END,
+                      &queueHandle);
+  assert(rc == SA_AIS_OK);
+
+  lockUnlockNode(true);
+  lockUnlockNode(false);
+  const SaMsgQueueThresholdsT thresholds = {
+       { 5, 5, 5, 5 },
+       { 5, 5, 5, 5 }
+  };
+  rc = saMsgQueueCapacityThresholdsSet(queueHandle, &thresholds);
+  test_validate(rc, SA_AIS_ERR_UNAVAILABLE);
+
+  rc = saMsgFinalize(msgHandle);
+  assert(rc == SA_AIS_OK);
+}
+
+static void saErrUnavailable_56(void)
+{
+  SaMsgHandleT msgHandle;
+  SaAisErrorT rc = saMsgInitialize(&msgHandle, 0, &msg3_1);
+  assert(rc == SA_AIS_OK);
+
+  SaMsgQueueHandleT queueHandle;
+  rc = saMsgQueueOpen(msgHandle,
+                      &queueName,
+                      &creationAttributes,
+                      SA_MSG_QUEUE_CREATE,
+                      SA_TIME_END,
+                      &queueHandle);
+  assert(rc == SA_AIS_OK);
+
+  lockUnlockNode(true);
+  lockUnlockNode(false);
+  SaMsgQueueThresholdsT thresholds;
+  rc = saMsgQueueCapacityThresholdsGet(queueHandle, &thresholds);
+  test_validate(rc, SA_AIS_ERR_UNAVAILABLE);
+
+  rc = saMsgFinalize(msgHandle);
+  assert(rc == SA_AIS_OK);
+}
+
 
 __attribute__((constructor)) static void saErrUnavailable_constructor(void)
 {
@@ -969,9 +1075,9 @@ __attribute__((constructor)) static void 
saErrUnavailable_constructor(void)
   test_case_add(26, saErrUnavailable_23, "saMsgMessageSendReceive");
   test_case_add(26, saErrUnavailable_24, "saMsgMessageReply");
   test_case_add(26, saErrUnavailable_25, "saMsgMessageReplyAsync");
-#if 0
   test_case_add(26, saErrUnavailable_26, "saMsgQueueCapacityThresholdsSet");
   test_case_add(26, saErrUnavailable_27, "saMsgQueueCapacityThresholdsGet");
+#if 0
   test_case_add(26, saErrUnavailable_28, "saMsgMetadataSizeGet");
   test_case_add(26, saErrUnavailable_29, "saMsgLimitGet");
 #endif
@@ -1000,9 +1106,9 @@ __attribute__((constructor)) static void 
saErrUnavailable_constructor(void)
   test_case_add(26, saErrUnavailable_52, "saMsgMessageSendReceive (stale)");
   test_case_add(26, saErrUnavailable_53, "saMsgMessageReply (stale)");
   test_case_add(26, saErrUnavailable_54, "saMsgMessageReplyAsync (stale)");
-#if 0
   test_case_add(26, saErrUnavailable_55, "saMsgQueueCapacityThresholdsSet 
(stale)");
   test_case_add(26, saErrUnavailable_56, "saMsgQueueCapacityThresholdsGet 
(stale)");
+#if 0
   test_case_add(26, saErrUnavailable_57, "saMsgMetadataSizeGet (stale)");
   test_case_add(26, saErrUnavailable_58, "saMsgLimitGet (stale)");
 #endif
diff --git a/src/msg/common/mqsv_edu.c b/src/msg/common/mqsv_edu.c
index 7b0749b..a5301da 100644
--- a/src/msg/common/mqsv_edu.c
+++ b/src/msg/common/mqsv_edu.c
@@ -477,7 +477,8 @@ static int mqsv_mqp_req_test_type_fnc(NCSCONTEXT arg)
               LCL_TEST_JUMP_OFFSET_MQP_EVT_TRANSFER_QUEUE_COMPLETE = 60,
               LCL_TEST_JUMP_OFFSET_MQP_EVT_UPDATE_STATS = 62,
               LCL_TEST_JUMP_OFFSET_MQP_EVT_RET_TIME_SET_REQ = 65,
-              LCL_TEST_JUMP_OFFSET_MQP_EVT_CLM_NOTIFY = 67 };
+              LCL_TEST_JUMP_OFFSET_MQP_EVT_CLM_NOTIFY = 67,
+              LCL_TEST_JUMP_OFFSET_MQP_EVT_CAPACITY = 68 };
        MQP_REQ_TYPE type;
 
        if (arg == NULL)
@@ -516,6 +517,9 @@ static int mqsv_mqp_req_test_type_fnc(NCSCONTEXT arg)
                return LCL_TEST_JUMP_OFFSET_MQP_EVT_RET_TIME_SET_REQ;
        case MQP_EVT_CLM_NOTIFY:
                return LCL_TEST_JUMP_OFFSET_MQP_EVT_CLM_NOTIFY;
+       case MQP_EVT_CAP_SET_REQ:
+       case MQP_EVT_CAP_GET_REQ:
+               return LCL_TEST_JUMP_OFFSET_MQP_EVT_CAPACITY;
 
        default:
                break;
@@ -578,6 +582,55 @@ static uint32_t mqsv_edp_samsgqueuecreationattributest(
 
 /*****************************************************************************
 
+  PROCEDURE NAME:   mqsv_edp_samsgqueuethresholdst
+
+  DESCRIPTION:      EDU program handler for "SaMsgQueueThresholdsT" data. This
+function is invoked by EDU for performing encode/decode operation on
+"SaMsgQueueThresholdsT" data.
+
+  RETURNS:          NCSCC_RC_SUCCESS/NCSCC_RC_FAILURE
+
+*****************************************************************************/
+static uint32_t mqsv_edp_samsgqueuethresholdst(
+    EDU_HDL *hdl, EDU_TKN *edu_tkn, NCSCONTEXT ptr, uint32_t *ptr_data_len,
+    EDU_BUF_ENV *buf_env, EDP_OP_TYPE op, EDU_ERR *o_err)
+{
+       uint32_t rc = NCSCC_RC_SUCCESS;
+       SaMsgQueueThresholdsT *struct_ptr = NULL, **d_ptr = NULL;
+       EDU_INST_SET mqsv_samsgqueuethresholds_rules[] = {
+           {EDU_START, mqsv_edp_samsgqueuethresholdst, 0, 0, 0,
+            sizeof(SaMsgQueueThresholdsT), 0, NULL},
+           {EDU_EXEC, m_NCS_EDP_SASIZET, EDQ_ARRAY, 0, 0,
+            (long)&((SaMsgQueueThresholdsT *)0)->capacityReached,
+            SA_MSG_MESSAGE_LOWEST_PRIORITY + 1, NULL},
+           {EDU_EXEC, m_NCS_EDP_SASIZET, EDQ_ARRAY, 0, 0,
+            (long)&((SaMsgQueueThresholdsT *)0)->capacityAvailable,
+            SA_MSG_MESSAGE_LOWEST_PRIORITY + 1, NULL},
+           {EDU_END, 0, 0, 0, 0, 0, 0, NULL},
+       };
+
+       if (op == EDP_OP_TYPE_ENC) {
+               struct_ptr = (SaMsgQueueThresholdsT *)ptr;
+       } else if (op == EDP_OP_TYPE_DEC) {
+               d_ptr = (SaMsgQueueThresholdsT **)ptr;
+               if (*d_ptr == NULL) {
+                       /* This should have already been a valid pointer. */
+                       *o_err = EDU_ERR_MEM_FAIL;
+                       return NCSCC_RC_FAILURE;
+               }
+               memset(*d_ptr, '\0', sizeof(SaMsgQueueThresholdsT));
+               struct_ptr = *d_ptr;
+       } else {
+               struct_ptr = ptr;
+       }
+       rc = m_NCS_EDU_RUN_RULES(hdl, edu_tkn,
+                                mqsv_samsgqueuethresholds_rules,
+                                struct_ptr, ptr_data_len, buf_env, op, o_err);
+       return rc;
+}
+
+/*****************************************************************************
+
   PROCEDURE NAME:   mqsv_edp_mqp_req
 
   DESCRIPTION:      EDU program handler for "MQP_REQ_MSG" data. This
@@ -802,6 +855,13 @@ static uint32_t mqsv_edp_mqp_req(EDU_HDL *hdl, EDU_TKN 
*edu_tkn, NCSCONTEXT ptr,
            {EDU_EXEC, ncs_edp_uns32, 0, 0, 0,
             (long)&((MQP_REQ_MSG *)0)->info.clmNotify.node_joined, 0, NULL},
 
+           /* MQP_EVT_CAP_REQ */
+           {EDU_EXEC, m_NCS_EDP_SAMSGQUEUEHANDLET, 0, 0, 0,
+            (long)&((MQP_REQ_MSG *)0)->info.capacity.queueHandle, 0, NULL},
+           {EDU_EXEC, mqsv_edp_samsgqueuethresholdst, 0, 0, 0,
+            (long)&((MQP_REQ_MSG *)0)->info.capacity.thresholds,
+            0, NULL},
+
            {EDU_END, 0, 0, 0, 0, 0, 0, NULL},
 
        };
@@ -956,7 +1016,7 @@ static int mqsv_mqp_rsp_test_type_fnc(NCSCONTEXT arg)
               LCL_TEST_JUMP_OFFSET_MQP_EVT_TRANSFER_QUEUE_RSP,
               LCL_TEST_JUMP_OFFSET_MQP_EVT_ND_RESTART_RSP = 29,
               LCL_TEST_JUMP_OFFSET_MQP_EVT_RET_TIME_SET_RSP,
-
+              LCL_TEST_JUMP_OFFSET_MQP_EVT_CAP_RSP = 32
        };
        MQP_RSP_TYPE type;
 
@@ -988,6 +1048,9 @@ static int mqsv_mqp_rsp_test_type_fnc(NCSCONTEXT arg)
                return LCL_TEST_JUMP_OFFSET_MQP_EVT_ND_RESTART_RSP;
        case MQP_EVT_Q_RET_TIME_SET_RSP:
                return LCL_TEST_JUMP_OFFSET_MQP_EVT_RET_TIME_SET_RSP;
+       case MQP_EVT_CAP_SET_RSP:
+       case MQP_EVT_CAP_GET_RSP:
+               return LCL_TEST_JUMP_OFFSET_MQP_EVT_CAP_RSP;
        default:
                break;
        }
@@ -1109,6 +1172,13 @@ static uint32_t mqsv_edp_mqp_rsp(EDU_HDL *hdl, EDU_TKN 
*edu_tkn, NCSCONTEXT ptr,
             (long)&((MQP_RSP_MSG *)0)->info.retTimeSetRsp.queueHandle, 0,
             NULL},
 
+           /* MQP_EVT_CAP_RSP */
+           {EDU_EXEC, m_NCS_EDP_SAMSGQUEUEHANDLET, 0, 0, 0,
+            (long)&((MQP_RSP_MSG *)0)->info.capacity.queueHandle, 0, NULL},
+           {EDU_EXEC, mqsv_edp_samsgqueuethresholdst, 0, 0, 0,
+            (long)&((MQP_RSP_MSG *)0)->info.capacity.thresholds,
+            0, NULL},
+
            {EDU_END, 0, 0, 0, 0, 0, 0, NULL},
        };
 
diff --git a/src/msg/common/mqsv_evt.h b/src/msg/common/mqsv_evt.h
index ef7c739..4e26bf6 100644
--- a/src/msg/common/mqsv_evt.h
+++ b/src/msg/common/mqsv_evt.h
@@ -62,7 +62,9 @@ typedef enum mqp_req_type {
   MQP_EVT_SEND_MSG_ASYNC,
   MQP_EVT_CB_DUMP, /* For CPND CB Dump */
   MQP_EVT_Q_RET_TIME_SET_REQ,
-  MQP_EVT_CLM_NOTIFY
+  MQP_EVT_CLM_NOTIFY,
+  MQP_EVT_CAP_SET_REQ,
+  MQP_EVT_CAP_GET_REQ
 } MQP_REQ_TYPE;
 
 /* Enums for MQP Message Types */
@@ -81,7 +83,9 @@ typedef enum mqp_rsp_type {
   MQP_EVT_TRANSFER_QUEUE_RSP,
   MQP_EVT_MQND_RESTART_RSP,
   MQP_EVT_STAT_UPD_RSP,
-  MQP_EVT_Q_RET_TIME_SET_RSP
+  MQP_EVT_Q_RET_TIME_SET_RSP,
+  MQP_EVT_CAP_SET_RSP,
+  MQP_EVT_CAP_GET_RSP
 } MQP_RSP_TYPE;
 
 /* Enums for MQD messages */
@@ -188,6 +192,11 @@ typedef struct mqp_q_ret_time_set_rsp {
   SaMsgQueueHandleT queueHandle;
 } MQP_Q_RET_TIME_SET_RSP;
 
+typedef struct mqp_capacity {
+  SaMsgQueueHandleT queueHandle;
+  SaMsgQueueThresholdsT thresholds;
+} MQP_CAPACITY;
+
 /* saMsgQueueOpen(): */
 
 typedef struct mqp_open_req {
@@ -396,6 +405,7 @@ typedef struct mqp_req_msg {
     MQP_UPDATE_STATS statsReq;
     MQP_Q_RET_TIME_SET_REQ retTimeSetReq;
     MQP_CLM_NOTIFY clmNotify;
+    MQP_CAPACITY capacity;
   } info;
 } MQP_REQ_MSG;
 
@@ -415,6 +425,7 @@ typedef struct mqp_rsp_msg {
     MQP_QUEUE_REPLY_RSP replyRsp;
     MQP_SEND_MSG_RSP sendMsgRsp;
     MQP_Q_RET_TIME_SET_RSP retTimeSetRsp;
+    MQP_CAPACITY capacity;
   } info;
 } MQP_RSP_MSG;
 
diff --git a/src/msg/msgd/mqd_api.c b/src/msg/msgd/mqd_api.c
index 8416851..83d5c21 100644
--- a/src/msg/msgd/mqd_api.c
+++ b/src/msg/msgd/mqd_api.c
@@ -49,10 +49,11 @@
 
 #include "msg/msgd/mqd.h"
 #include "mqd_imm.h"
+#include "mqd_ntf.h"
 
 MQDLIB_INFO gl_mqdinfo;
 
-enum { FD_TERM = 0, FD_AMF, FD_MBCSV, FD_MBX, FD_CLM, NUM_FD };
+enum { FD_TERM = 0, FD_AMF, FD_MBCSV, FD_MBX, FD_CLM, FD_NTF, NUM_FD };
 
 static struct pollfd fds[NUM_FD];
 static nfds_t nfds = NUM_FD;
@@ -146,12 +147,50 @@ static SaAisErrorT mqd_clm_init(MQD_CB *cb)
                TRACE_1("saClmClusterTrack success");
        } while (false);
 
-       if (saErr != SA_AIS_OK && !cb->clm_hdl)
+       if (saErr != SA_AIS_OK && cb->clm_hdl)
                saClmFinalize(cb->clm_hdl);
 
        return saErr;
 }
 
+static SaAisErrorT mqd_ntf_init(MQD_CB *cb)
+{
+       SaAisErrorT rc = SA_AIS_OK;
+       TRACE_ENTER();
+
+       do {
+               SaVersionT ntfVersion = { 'A', 1, 1 };
+               SaNtfCallbacksT callbacks = {
+                       mqdNtfNotificationCallback,
+                       0
+               };
+
+               rc = saNtfInitialize(&cb->ntfHandle, &callbacks, &ntfVersion);
+               if (rc != SA_AIS_OK) {
+                       LOG_ER("saNtfInitialize failed with error %u", rc);
+                       break;
+               }
+
+               rc = saNtfSelectionObjectGet(cb->ntfHandle, &cb->ntf_sel_obj);
+               if (SA_AIS_OK != rc) {
+                       LOG_ER("saNtfSelectionObjectGet failed with error %u",
+                              rc);
+                       break;
+               }
+
+               rc = mqdInitNtfSubscriptions(cb->ntfHandle);
+               if (rc != SA_AIS_OK)
+                       break;
+       } while (false);
+
+       if (rc != SA_AIS_OK && cb->ntfHandle)
+               saNtfFinalize(cb->ntfHandle);
+
+       TRACE_LEAVE();
+
+       return rc;
+}
+
 /****************************************************************************\
   PROCEDURE NAME : mqd_lib_init
 
@@ -306,6 +345,16 @@ static uint32_t mqd_lib_init(void)
                mqd_cb_shut(pMqd);
                return NCSCC_RC_FAILURE;
        }
+
+       if (mqd_ntf_init(pMqd) != SA_AIS_OK) {
+               mqd_asapi_unbind();
+               saAmfFinalize(pMqd->amf_hdl);
+               saClmFinalize(pMqd->clm_hdl);
+               ncshm_give_hdl(pMqd->hdl);
+               mqd_cb_shut(pMqd);
+               return NCSCC_RC_FAILURE;
+       }
+
        if ((rc = initialize_for_assignment(pMqd, pMqd->ha_state)) !=
            NCSCC_RC_SUCCESS) {
                LOG_ER("initialize_for_assignment FAILED %u", (unsigned)rc);
@@ -493,6 +542,8 @@ void mqd_main_process(uint32_t hdl)
                fds[FD_CLM].events = POLLIN;
                fds[FD_MBCSV].fd = pMqd->mbcsv_sel_obj;
                fds[FD_MBCSV].events = POLLIN;
+               fds[FD_NTF].fd = pMqd->ntf_sel_obj;
+               fds[FD_NTF].events = POLLIN;
 
                int ret = poll(fds, nfds, -1);
                if (ret == -1) {
@@ -522,6 +573,15 @@ void mqd_main_process(uint32_t hdl)
                                LOG_ER("saClmDispatch failed: %u", err);
                        }
                }
+
+               if (fds[FD_NTF].revents & POLLIN) {
+                       /* dispatch all the NTF pending function */
+                       err = saNtfDispatch(pMqd->ntfHandle, SA_DISPATCH_ALL);
+                       if (err != SA_AIS_OK) {
+                               LOG_ER("saNtfDispatch failed: %u", err);
+                       }
+               }
+
                /* dispatch all the MBCSV pending callbacks */
                if (fds[FD_MBCSV].revents & POLLIN) {
                        mbcsv_arg.i_op = NCS_MBCSV_OP_DISPATCH;
diff --git a/src/msg/msgd/mqd_db.h b/src/msg/msgd/mqd_db.h
index 022b03a..93dc18f 100644
--- a/src/msg/msgd/mqd_db.h
+++ b/src/msg/msgd/mqd_db.h
@@ -34,6 +34,7 @@
 #include <stdbool.h>
 #include <saClm.h>
 #include <saImmOi.h>
+#include <saNtf.h>
 
 #define MQSV_MQD_MBCSV_VERSION 1
 #define MQSV_MQD_MBCSV_VERSION_MIN 1
@@ -91,6 +92,7 @@ typedef struct mqd_qinfo {
   NCS_QUEUE ilist; /* Queue/Group element list */
   NCS_QUEUE tlist; /* List of the user who opted track */
   SaTimeT creationTime;
+  bool capacityReached;
 } MQD_OBJ_INFO;
 
 typedef struct mqd_object_elem {
@@ -146,6 +148,7 @@ typedef struct mqd_cb {
   SaClmHandleT clm_hdl;
   SaAmfHAStateT ha_state; /* Present AMF HA state of the component */
   SaNameT comp_name;
+  SaNtfHandleT ntfHandle;
 
   NCS_MBCSV_HDL mbcsv_hdl;       /*MBCSV handle for Redundancy of initialize  
*/
   NCS_MBCSV_CKPT_HDL o_ckpt_hdl; /*Opened Checkpoint Handle */
@@ -171,6 +174,7 @@ typedef struct mqd_cb {
   SaSelectionObjectT imm_sel_obj; /*Selection object to wait for
                                      IMM events */
   SaSelectionObjectT clm_sel_obj;
+  SaSelectionObjectT ntf_sel_obj;
   bool fully_initialized;
 } MQD_CB;
 
diff --git a/src/msg/msgd/mqd_ntf.cc b/src/msg/msgd/mqd_ntf.cc
new file mode 100644
index 0000000..ab29193
--- /dev/null
+++ b/src/msg/msgd/mqd_ntf.cc
@@ -0,0 +1,310 @@
+/*      -*- OpenSAF  -*-
+ *
+ * (C) Copyright 2017 The OpenSAF Foundation
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed
+ * under the GNU Lesser General Public License Version 2.1, February 1999.
+ * The complete license can be accessed from the following location:
+ * http://opensource.org/licenses/lgpl-license.php
+ * See the Copying file included with the OpenSAF distribution for full
+ * licensing terms.
+ *
+ * Author(s): Genband
+ *
+ */
+#include <cstring>
+#include <vector>
+#include <saMsg.h>
+#include "base/ncs_edu_pub.h"
+#include "base/logtrace.h"
+#include "base/ncs_queue.h"
+#include "base/ncspatricia.h"
+#include "base/ncssysf_tmr.h"
+#include "mbc/mbcsv_papi.h"
+#include "mds/mds_papi.h"
+#include "msg/common/mqsv_def.h"
+#include "msg/common/mqsv_asapi.h"
+#include "mqd_tmr.h"
+#include "mqd_db.h"
+#include "mqd_dl_api.h"
+#include "mqd_ntf.h"
+
+extern MQDLIB_INFO gl_mqdinfo;
+
+static const int operStateSubId = 1;
+
+static void sendStateChangeNotification(MQD_CB *cb,
+                                        const SaNameT& queueGroup,
+                                        SaUint16T status) {
+  TRACE_ENTER();
+
+  do {
+    if (status == SA_MSG_QUEUE_CAPACITY_REACHED)
+      status = SA_MSG_QUEUE_GROUP_CAPACITY_REACHED;
+    else if (status == SA_MSG_QUEUE_CAPACITY_AVAILABLE)
+      status = SA_MSG_QUEUE_GROUP_CAPACITY_AVAILABLE;
+
+    const SaNameT notifyingObject = {
+      sizeof("safApp=safMsgService") - 1,
+      "safApp=safMsgService"
+    };
+
+    SaNtfStateChangeNotificationT ntfStateChange = { 0 };
+
+    SaAisErrorT rc(saNtfStateChangeNotificationAllocate(cb->ntfHandle,
+                                                        &ntfStateChange,
+                                                        1,
+                                                        0,
+                                                        0,
+                                                        1,
+                                                        0));
+
+    if (rc != SA_AIS_OK) {
+      LOG_ER("saNtfStateChangeNotificationAllocate " "failed: %i", rc);
+      break;
+    }
+
+    *ntfStateChange.notificationHeader.eventType = SA_NTF_OBJECT_STATE_CHANGE;
+
+    memcpy(ntfStateChange.notificationHeader.notificationObject,
+           &queueGroup,
+           sizeof(SaNameT));
+
+    memcpy(ntfStateChange.notificationHeader.notifyingObject,
+           &notifyingObject,
+           sizeof(SaNameT));
+
+    ntfStateChange.notificationHeader.notificationClassId->vendorId =
+      SA_NTF_VENDOR_ID_SAF;
+    ntfStateChange.notificationHeader.notificationClassId->majorId =
+      SA_SVC_MSG;
+    ntfStateChange.notificationHeader.notificationClassId->minorId =
+      (status == SA_MSG_QUEUE_GROUP_CAPACITY_REACHED) ? 0x67 : 0x68;
+
+    *ntfStateChange.sourceIndicator = SA_NTF_OBJECT_OPERATION;
+
+    ntfStateChange.changedStates[0].stateId = SA_MSG_DEST_CAPACITY_STATUS;
+
+    ntfStateChange.changedStates[0].oldStatePresent =
+      (status == SA_MSG_QUEUE_GROUP_CAPACITY_REACHED) ? SA_FALSE : SA_TRUE;
+
+    if (status == SA_MSG_QUEUE_GROUP_CAPACITY_AVAILABLE) {
+      ntfStateChange.changedStates[0].oldState =
+        SA_MSG_QUEUE_GROUP_CAPACITY_REACHED;
+    }
+
+    ntfStateChange.changedStates[0].newState = status;
+
+    rc = saNtfNotificationSend(ntfStateChange.notificationHandle);
+
+    if (rc != SA_AIS_OK)
+      LOG_ER("saNtfNotificationSend failed: %i", rc);
+
+    rc = saNtfNotificationFree(ntfStateChange.notificationHandle);
+
+    if (rc != SA_AIS_OK)
+      LOG_ER("saNtfNotificationFree failed: %i", rc);
+  } while (false);
+
+  TRACE_LEAVE();
+}
+
+static void updateQueueGroup(MQD_CB *cb,
+                             const SaNameT& queueName,
+                             SaUint16T status) {
+  TRACE_ENTER();
+
+  bool member(false), allFull(true);
+  typedef std::vector<SaNameT> GroupList;
+  GroupList groupList;
+
+  // maybe there is a better way to do this?
+  for (MQD_OBJ_NODE *objNode(reinterpret_cast<MQD_OBJ_NODE *>
+         (ncs_patricia_tree_getnext(&cb->qdb, 0)));
+       objNode;
+       objNode = reinterpret_cast<MQD_OBJ_NODE *>(ncs_patricia_tree_getnext(
+         &cb->qdb,
+         reinterpret_cast<uint8_t *>(&objNode->oinfo.name)))) {
+
+    if (objNode->oinfo.type == MQSV_OBJ_QGROUP) {
+      NCS_Q_ITR itr = { 0 };
+
+      for (MQD_OBJECT_ELEM *objElem(static_cast<MQD_OBJECT_ELEM *>
+             (ncs_queue_get_next(&objNode->oinfo.ilist, &itr)));
+           objElem;
+           objElem = static_cast<MQD_OBJECT_ELEM *>
+             (ncs_queue_get_next(&objNode->oinfo.ilist, &itr))) {
+        if (objElem->pObject->type == MQSV_OBJ_QUEUE) {
+          if (queueName.length == objElem->pObject->name.length &&
+              !memcmp(queueName.value,
+                      objElem->pObject->name.value,
+                      queueName.length)) {
+            member = true;
+            if (status == SA_MSG_QUEUE_CAPACITY_REACHED)
+              objElem->pObject->capacityReached = true;
+            else if (status == SA_MSG_QUEUE_CAPACITY_AVAILABLE)
+              objElem->pObject->capacityReached = false;
+
+            groupList.push_back(objNode->oinfo.name);
+          } else if (!objElem->pObject->capacityReached) {
+            allFull = false;
+          }
+        }
+      }
+    }
+  }
+
+  if (member && ((status == SA_MSG_QUEUE_CAPACITY_REACHED && allFull) ||
+      (status == SA_MSG_QUEUE_CAPACITY_AVAILABLE && allFull))) {
+    for (auto& it : groupList)
+      sendStateChangeNotification(cb, it, status);
+  } else {
+    // don't care about anything else
+  }
+
+  TRACE_LEAVE();
+}
+
+static bool isMSGStateChange(const SaNtfClassIdT &classId) {
+  TRACE_ENTER();
+  bool status(false);
+
+  if (classId.vendorId == SA_NTF_VENDOR_ID_SAF &&
+      classId.majorId == SA_SVC_MSG) {
+    status = true;
+  }
+
+  TRACE_LEAVE2("%i", status);
+  return status;
+}
+
+static void handleMsgObjectStateChangeNotification(
+  MQD_CB *cb,
+  const SaNtfStateChangeNotificationT& stateChangeNotification) {
+  TRACE_ENTER();
+
+  do {
+    if (*stateChangeNotification.sourceIndicator != SA_NTF_OBJECT_OPERATION) {
+      LOG_ER("expected object operation -- got %i",
+             *stateChangeNotification.sourceIndicator);
+      break;
+    }
+
+    if (stateChangeNotification.numStateChanges != 1) {
+      LOG_ER("expected one state change -- got %i",
+             stateChangeNotification.numStateChanges);
+      break;
+    }
+
+    if (stateChangeNotification.changedStates[0].stateId !=
+          SA_MSG_DEST_CAPACITY_STATUS) {
+      LOG_ER("unknown state id: %i",
+             stateChangeNotification.changedStates[0].stateId);
+      break;
+    }
+
+    // if all the queues for a group are full then send a notification
+    updateQueueGroup(
+      cb,
+      *stateChangeNotification.notificationHeader.notificationObject,
+      stateChangeNotification.changedStates[0].newState);
+  } while (false);
+
+  TRACE_LEAVE();
+}
+
+static void handleStateChangeNotification(
+  MQD_CB *cb,
+  const SaNtfStateChangeNotificationT& stateChangeNotification) {
+  TRACE_ENTER();
+
+  if (stateChangeNotification.notificationHeader.notificationClassId &&
+      isMSGStateChange(
+          *stateChangeNotification.notificationHeader.notificationClassId)) {
+    handleMsgObjectStateChangeNotification(cb, stateChangeNotification);
+  } else {
+    TRACE("ignoring non-MSG state change notification");
+  }
+
+  TRACE_LEAVE();
+}
+
+void mqdNtfNotificationCallback(SaNtfSubscriptionIdT subscriptionId,
+                                const SaNtfNotificationsT *notification) {
+  TRACE_ENTER();
+  MQD_CB *cb(static_cast<MQD_CB *>(ncshm_take_hdl(NCS_SERVICE_ID_MQD,
+                                                  gl_mqdinfo.inst_hdl)));
+
+  if (!cb) {
+    LOG_ER("can't get mqd control block");
+  }
+
+  do {
+    if (cb->ha_state != SA_AMF_HA_ACTIVE)
+      break;
+
+    if (subscriptionId != operStateSubId) {
+      TRACE("unknown subscription id received in mqdNtfNotificationCallback: "
+            "%d",
+            subscriptionId);
+      break;
+    }
+
+    if (notification->notificationType == SA_NTF_TYPE_STATE_CHANGE) {
+      handleStateChangeNotification(
+          cb,
+          notification->notification.stateChangeNotification);
+    } else {
+      TRACE("ignoring NTF notification of type: %i",
+            notification->notificationType);
+    }
+  } while (false);
+
+  ncshm_give_hdl(cb->hdl);
+
+  TRACE_LEAVE();
+}
+
+SaAisErrorT mqdInitNtfSubscriptions(SaNtfHandleT ntfHandle) {
+  TRACE_ENTER();
+  SaAisErrorT rc(SA_AIS_OK);
+
+  do {
+    SaNtfStateChangeNotificationFilterT filter;
+
+    rc = saNtfStateChangeNotificationFilterAllocate(
+      ntfHandle,
+      &filter, 0, 0, 0, 0, 0, 0);
+
+    if (rc != SA_AIS_OK) {
+      LOG_ER("saNtfAttributeChangeNotificationFilterAllocate"
+        " failed: %i",
+        rc);
+      break;
+    }
+
+    SaNtfNotificationTypeFilterHandlesT filterHandles = {
+      0, 0, filter.notificationFilterHandle, 0, 0
+    };
+
+    rc = saNtfNotificationSubscribe(&filterHandles,
+      operStateSubId);
+
+    if (rc != SA_AIS_OK) {
+      LOG_ER("saNtfNotificationSubscribe failed: %i", rc);
+      break;
+    }
+
+    rc = saNtfNotificationFilterFree(filter.notificationFilterHandle);
+
+    if (rc != SA_AIS_OK) {
+      LOG_ER("saNtfNotificationFilterFree failed: %i", rc);
+      break;
+    }
+  } while (false);
+
+  TRACE_LEAVE();
+  return rc;
+}
diff --git a/src/msg/msgd/mqd_ntf.h b/src/msg/msgd/mqd_ntf.h
new file mode 100644
index 0000000..4d9c0f1
--- /dev/null
+++ b/src/msg/msgd/mqd_ntf.h
@@ -0,0 +1,49 @@
+/*      -*- OpenSAF  -*-
+ *
+ * (C) Copyright 2017 The OpenSAF Foundation
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed
+ * under the GNU Lesser General Public License Version 2.1, February 1999.
+ * The complete license can be accessed from the following location:
+ * http://opensource.org/licenses/lgpl-license.php
+ * See the Copying file included with the OpenSAF distribution for full
+ * licensing terms.
+ *
+ * Author(s): Genband
+ *
+ */
+
+/*****************************************************************************
+..............................................................................
+
+  FILE NAME: mqd_ntf.h
+
+..............................................................................
+
+  DESCRIPTION:
+
+  MQD Ntf Structures & Parameters.
+
+******************************************************************************/
+
+#ifndef MSG_MSGD_MQD_NTF_H_
+#define MSG_MSGD_MQD_NTF_H_
+
+#include <saNtf.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+void mqdNtfNotificationCallback(SaNtfSubscriptionIdT subscriptionId,
+                               const SaNtfNotificationsT *notification);
+
+SaAisErrorT mqdInitNtfSubscriptions(SaNtfHandleT);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/src/msg/msgnd/mqnd_db.h b/src/msg/msgnd/mqnd_db.h
index d04566e..fc7d926 100644
--- a/src/msg/msgnd/mqnd_db.h
+++ b/src/msg/msgnd/mqnd_db.h
@@ -74,6 +74,7 @@ typedef struct mqnd_queue_info {
   SaMsgQueueHandleT listenerHandle; /* Listener queue handle */
   SaNameT queueName;
   SaSizeT size[SA_MSG_MESSAGE_LOWEST_PRIORITY + 1]; /* Size of the queue */
+  SaMsgQueueThresholdsT thresholds;
   SaMsgQueueStatusT queueStatus;        /* Status info of the queue */
   SaMsgQueueSendingStateT sendingState; /* Sending state is removed from B.1.1,
                                            but used internally */
@@ -91,6 +92,8 @@ typedef struct mqnd_queue_info {
   uint32_t numberOfFullErrors[SA_MSG_MESSAGE_LOWEST_PRIORITY + 1];
   SaUint64T totalQueueSize;
   uint32_t shm_queue_index;
+  bool capacityReachedSent;
+  bool capacityAvailableSent;
 } MQND_QUEUE_INFO;
 
 typedef struct mqnd_qhndl_node {
@@ -127,6 +130,7 @@ typedef struct mqnd_queue_ckpt_info {
   SaMsgQueueHandleT listenerHandle; /* Listener queue handle */
   SaNameT queueName;
   SaSizeT size[SA_MSG_MESSAGE_LOWEST_PRIORITY + 1]; /* Size of the queue */
+  SaMsgQueueThresholdsT thresholds;
   SaMsgQueueCreationFlagsT creationFlags;
   SaTimeT creationTime; /* Queue Creation time */
   SaTimeT retentionTime;
@@ -145,6 +149,8 @@ typedef struct mqnd_queue_ckpt_info {
   QueueStatsShm; /* Structure to store queue stats in shared memory */
   MQND_TMR qtransfer_complete_tmr; /* Q Transfer Complete Timer */
   uint32_t valid; /* To verify whether the checkpoint info is valid or not */
+  bool capacityReachedSent;
+  bool capacityAvailableSent;
 } MQND_QUEUE_CKPT_INFO;
 
 typedef struct mqnd_shm_info {
diff --git a/src/msg/msgnd/mqnd_evt.c b/src/msg/msgnd/mqnd_evt.c
index 0c2a612..fcbe4d6 100644
--- a/src/msg/msgnd/mqnd_evt.c
+++ b/src/msg/msgnd/mqnd_evt.c
@@ -42,6 +42,8 @@ static uint32_t mqnd_evt_proc_update_stats_shm(MQND_CB *cb,
                                               MQSV_DSEND_EVT *evt);
 static uint32_t mqnd_evt_proc_cb_dump(void);
 static uint32_t mqnd_evt_proc_ret_time_set(MQND_CB *cb, MQSV_EVT *evt);
+static uint32_t mqnd_evt_proc_cap_set(MQND_CB *, MQSV_EVT *);
+static uint32_t mqnd_evt_proc_cap_get(MQND_CB *, MQSV_EVT *);
 static void mqnd_dump_queue_status(MQND_CB *cb, SaMsgQueueStatusT *queueStatus,
                                   uint32_t offset);
 static void mqnd_dump_timer_info(MQND_TMR tmr);
@@ -186,6 +188,12 @@ static uint32_t mqnd_proc_mqp_req_msg(MQND_CB *cb, 
MQSV_EVT *evt)
        case MQP_EVT_Q_RET_TIME_SET_REQ:
                rc = mqnd_evt_proc_ret_time_set(cb, evt);
                break;
+       case MQP_EVT_CAP_SET_REQ:
+               rc = mqnd_evt_proc_cap_set(cb, evt);
+               break;
+       case MQP_EVT_CAP_GET_REQ:
+               rc = mqnd_evt_proc_cap_get(cb, evt);
+               break;
        default:
                LOG_ER("%s:%u: unrecognized message type: %d", __FILE__,
                       __LINE__, evt->msg.mqp_req.type);
@@ -946,6 +954,191 @@ send_rsp:
 }
 
 /****************************************************************************
+ * Name          : sendStateChangeNotification
+ *
+ * Description   : Send state change notification for queues
+ *
+ * Arguments     : MQND_CB *cb - MQND CB pointer
+ *                 MQND_QUEUE_INFO *qInfo - queue info struct
+ *                 SaMsgMessageCapacityStatusT - status to send
+ *
+ * Return Values : None.
+ *
+ * Notes         : None.
+ *****************************************************************************/
+static void sendStateChangeNotification(MQND_CB *cb,
+                                       MQND_QUEUE_INFO *qInfo,
+                                       SaMsgMessageCapacityStatusT status)
+{
+       SaNtfHandleT ntfHandle = 0;
+
+       do {
+               SaVersionT ntfVersion = { 'A', 1, 1 };
+
+               /* do we need to send it? */
+               if ((status == SA_MSG_QUEUE_CAPACITY_REACHED &&
+                       qInfo->capacityReachedSent) ||
+                       (status == SA_MSG_QUEUE_CAPACITY_AVAILABLE &&
+                       qInfo->capacityAvailableSent)) {
+                       break;
+               }
+
+               SaAisErrorT rc = saNtfInitialize(&ntfHandle, 0, &ntfVersion);
+
+               if (rc != SA_AIS_OK) {
+                       LOG_ER("saNtfInitialize failed: %i", rc);
+                       break;
+               }
+
+               const SaNameT notifyingObject = {
+                       sizeof("safApp=safMsgService") - 1,
+                       "safApp=safMsgService"
+               };
+
+               SaNtfStateChangeNotificationT ntfStateChange = { 0 };
+
+               rc = saNtfStateChangeNotificationAllocate(
+                       ntfHandle,
+                       &ntfStateChange,
+                       1, /* numCorrelated Notifications */
+                       0, /* length addition text */
+                       0, /* num additional info */
+                       1, /* num of state changes */
+                       0); /* variable data size */
+
+               if (rc != SA_AIS_OK) {
+                       LOG_ER("saNtfStateChangeNotificationAllocate "
+                               "failed: %i", rc);
+                       break;
+               }
+
+               *ntfStateChange.notificationHeader.eventType =
+                       SA_NTF_OBJECT_STATE_CHANGE;
+
+               memcpy(ntfStateChange.notificationHeader.notificationObject,
+                               &qInfo->queueName,
+                               sizeof(SaNameT));
+
+               memcpy(ntfStateChange.notificationHeader.notifyingObject,
+                       &notifyingObject,
+                       sizeof(SaNameT));
+
+               ntfStateChange.notificationHeader.notificationClassId->vendorId 
=
+                       SA_NTF_VENDOR_ID_SAF;
+               ntfStateChange.notificationHeader.notificationClassId->majorId =
+                       SA_SVC_MSG;
+               ntfStateChange.notificationHeader.notificationClassId->minorId =
+                       (status == SA_MSG_QUEUE_CAPACITY_REACHED) ?
+                       0x65 : 0x66;
+
+               *ntfStateChange.sourceIndicator = SA_NTF_OBJECT_OPERATION;
+
+               ntfStateChange.changedStates[0].stateId =
+                       SA_MSG_DEST_CAPACITY_STATUS;
+
+               ntfStateChange.changedStates[0].oldStatePresent =
+                       (status == SA_MSG_QUEUE_CAPACITY_REACHED) ?
+                       SA_FALSE : SA_TRUE;
+
+               if (status == SA_MSG_QUEUE_CAPACITY_AVAILABLE) {
+                       ntfStateChange.changedStates[0].oldState =
+                               SA_MSG_QUEUE_CAPACITY_REACHED;
+               }
+
+               ntfStateChange.changedStates[0].newState = status;
+
+               rc = saNtfNotificationSend(ntfStateChange.notificationHandle);
+
+               if (rc != SA_AIS_OK)
+                       LOG_ER("saNtfNotificationSend failed: %i", rc);
+               else {
+                       if (status == SA_MSG_QUEUE_CAPACITY_REACHED) {
+                               qInfo->capacityReachedSent = true;
+                               qInfo->capacityAvailableSent = false;
+                       } else {
+                               qInfo->capacityReachedSent = false;
+                               qInfo->capacityAvailableSent = true;
+                       }
+               }
+
+               rc = saNtfNotificationFree(ntfStateChange.notificationHandle);
+
+               if (rc != SA_AIS_OK)
+                       LOG_ER("saNtfNotificationFree failed: %i", rc);
+       } while (false);
+
+       if (ntfHandle) {
+               SaAisErrorT rc = saNtfFinalize(ntfHandle);
+
+               if (rc != SA_AIS_OK)
+                       LOG_ER("saNtfFinalize failed: %i", rc);
+       }
+}
+
+/****************************************************************************
+ * Name          : checkCapacity
+ *
+ * Description   : Function to check whether we need to send state change
+ *                 notification.
+ *
+ * Arguments     : MQND_CB *cb - MQND CB pointer
+ *                 MQND_QUEUE_INFO *qInfo - queue info
+ *
+ * Return Values : None.
+ *
+ * Notes         : None.
+ *****************************************************************************/
+static void checkCapacity(MQND_CB *cb, MQND_QUEUE_INFO *qInfo)
+{
+       int i;
+       uint32_t offset = qInfo->shm_queue_index;
+       MQND_QUEUE_CKPT_INFO *shm_base_addr = cb->mqnd_shm.shm_base_addr;
+       bool sendNotification = true;
+       SaMsgMessageCapacityStatusT capacityType =
+               SA_MSG_QUEUE_CAPACITY_REACHED;
+
+       TRACE_ENTER();
+
+       /* send capacity notifications */
+       for (i = SA_MSG_MESSAGE_HIGHEST_PRIORITY;
+               i <= SA_MSG_MESSAGE_LOWEST_PRIORITY;
+               i++)
+       {
+               TRACE("capacityReached[%i]: %llu capacityAvailable: %llu "
+                               "queueUsed: %llu",
+                               i,
+                               qInfo->thresholds.capacityReached[i],
+                               qInfo->thresholds.capacityAvailable[i],
+                               shm_base_addr[offset].QueueStatsShm.
+                                       saMsgQueueUsage[i].queueUsed);
+               if (!qInfo->capacityReachedSent &&
+                       qInfo->thresholds.capacityReached[i] >=
+                               shm_base_addr[offset].QueueStatsShm.
+                               saMsgQueueUsage[i].queueUsed) {
+                       /* only send notification if all are full */
+                       sendNotification = false;
+                       TRACE("not sending notification");
+                       break;
+               } else if (!qInfo->capacityAvailableSent &&
+                               qInfo->thresholds.capacityAvailable[i] >
+                                       shm_base_addr[offset].QueueStatsShm.
+                                       saMsgQueueUsage[i].queueUsed) {
+                       /* send notification if at least one comes available */
+                       TRACE("sending available notification");
+                       sendNotification = true;
+                       capacityType = SA_MSG_QUEUE_CAPACITY_AVAILABLE;
+                       break;
+               }
+       }
+
+       TRACE("sendNotification: %i", sendNotification);
+       if (sendNotification)
+               sendStateChangeNotification(cb, qInfo, capacityType);
+
+       TRACE_LEAVE();
+}
+
+/****************************************************************************
  * Name          : mqnd_evt_proc_update_stats_shm
  *
  * Description   : Function to update stats of queue in shm when message is
@@ -998,7 +1191,10 @@ static uint32_t mqnd_evt_proc_update_stats_shm(MQND_CB 
*cb, MQSV_DSEND_EVT *evt)
 
        statsReq = &evt->info.statsReq;
 
-       if (cb->is_restart_done)
+       if (!cb->clm_node_joined) {
+               err = SA_AIS_ERR_UNAVAILABLE;
+               goto done;
+       } else if (cb->is_restart_done)
                err = SA_AIS_OK;
        else {
                err = SA_AIS_ERR_TRY_AGAIN;
@@ -1039,6 +1235,8 @@ static uint32_t mqnd_evt_proc_update_stats_shm(MQND_CB 
*cb, MQSV_DSEND_EVT *evt)
                goto done;
        }
 
+       checkCapacity(cb, &qnode->qinfo);
+
 done:
 
        direct_rsp_evt =
@@ -1292,6 +1490,9 @@ static uint32_t mqnd_evt_proc_send_msg(MQND_CB *cb, 
MQSV_DSEND_EVT *evt)
 
        mqnd_send_msg_update_stats_shm(cb, qnode, snd_msg->message.size,
                                       snd_msg->message.priority);
+
+       checkCapacity(cb, &qnode->qinfo);
+
 send_resp:
        /* If the error happens in SendReceive case while sending the message
           then return the response from here and don't wait for the reply
@@ -1741,6 +1942,162 @@ send_rsp:
 }
 
 /****************************************************************************
+ * Name          : mqnd_evt_proc_cap_set
+ *
+ * Description   : Function to set capacity thresholds of queue
+ *
+ * Arguments     :
+ *
+ * Return Values : NCSCC_RC_SUCCESS/Error.
+ *
+ * Notes         : None.
+ *****************************************************************************/
+static uint32_t mqnd_evt_proc_cap_set(MQND_CB *cb, MQSV_EVT *evt)
+{
+       SaAisErrorT err = SA_AIS_OK;
+       MQSV_EVT rsp_evt;
+       uint32_t rc = NCSCC_RC_SUCCESS;
+       TRACE_ENTER();
+
+       do {
+               MQND_QUEUE_NODE *qnode = NULL;
+               int i;
+
+               if (!cb->clm_node_joined) {
+                       err = SA_AIS_ERR_UNAVAILABLE;
+                       break;
+               } else if (cb->is_restart_done) {
+                       err = SA_AIS_OK;
+               } else {
+                       LOG_ER("%s:%u: ERR_TRY_AGAIN: MQND is not completely "
+                                       "Initialized",
+                                       __FILE__, __LINE__);
+                       err = SA_AIS_ERR_TRY_AGAIN;
+                       break;
+               }
+
+               mqnd_queue_node_get(cb,
+                               evt->msg.mqp_req.info.retTimeSetReq.queueHandle,
+                               &qnode);
+
+               /* If queue not found */
+               if (!qnode) {
+                       LOG_ER("ERR_BAD_HANDLE: Get queue node Failed");
+                       err = SA_AIS_ERR_BAD_HANDLE;
+                       break;
+               }
+
+               /*
+                * capacityAvailable and capacityReached have already been
+                * checked in the agent with regards to each other
+                */
+               for (i = SA_MSG_MESSAGE_HIGHEST_PRIORITY;
+                               i <= SA_MSG_MESSAGE_LOWEST_PRIORITY;
+                               i++)
+               {
+                       if (qnode->qinfo.size[i] <
+                               
evt->msg.mqp_req.info.capacity.thresholds.capacityReached[i]) {
+                               TRACE("size is less than capacity reached");
+                               err = SA_AIS_ERR_INVALID_PARAM;
+                               break;
+                       }
+               }
+
+               if (err != SA_AIS_OK)
+                       break;
+
+               qnode->qinfo.thresholds =
+                       evt->msg.mqp_req.info.capacity.thresholds;
+       } while (false);
+
+       /*Send the resp to MQA */
+       memset(&rsp_evt, 0, sizeof(MQSV_EVT));
+
+       rsp_evt.type = MQSV_EVT_MQP_RSP;
+       rsp_evt.msg.mqp_rsp.type = MQP_EVT_CAP_SET_RSP;
+       rsp_evt.msg.mqp_rsp.error = err;
+
+       rc = mqnd_mds_send_rsp(cb, &evt->sinfo, &rsp_evt);
+
+       if (rc != NCSCC_RC_SUCCESS)
+               LOG_ER(
+                   "Queue Capacity set :Mds Send Response Failed %" PRIx64,
+                   evt->sinfo.dest);
+       else
+               TRACE_1("Queue Capacity set: Mds Send Response Success");
+
+       TRACE_LEAVE2("Returned with return code %u", rc);
+       return rc;
+}
+
+/****************************************************************************
+ * Name          : mqnd_evt_proc_cap_get
+ *
+ * Description   : Function to get capacity thresholds of queue
+ *
+ * Arguments     :
+ *
+ * Return Values : NCSCC_RC_SUCCESS/Error.
+ *
+ * Notes         : None.
+ *****************************************************************************/
+static uint32_t mqnd_evt_proc_cap_get(MQND_CB *cb, MQSV_EVT *evt)
+{
+       SaAisErrorT err = SA_AIS_OK;
+       MQSV_EVT rsp_evt;
+       MQND_QUEUE_NODE *qnode = NULL;
+       uint32_t rc = NCSCC_RC_SUCCESS;
+
+       TRACE_ENTER();
+
+       do {
+               if (!cb->clm_node_joined) {
+                       err = SA_AIS_ERR_UNAVAILABLE;
+                       break;
+               } else if (cb->is_restart_done) {
+                       err = SA_AIS_OK;
+               } else {
+                       LOG_ER("%s:%u: ERR_TRY_AGAIN: MQND is not completely "
+                                       "Initialized",
+                                       __FILE__, __LINE__);
+                       err = SA_AIS_ERR_TRY_AGAIN;
+                       break;
+               }
+
+               mqnd_queue_node_get(cb,
+                               evt->msg.mqp_req.info.retTimeSetReq.queueHandle,
+                               &qnode);
+
+               /* If queue not found */
+               if (!qnode) {
+                       LOG_ER("ERR_BAD_HANDLE: Get queue node Failed");
+                       err = SA_AIS_ERR_BAD_HANDLE;
+                       break;
+               }
+       } while (false);
+
+       /*Send the resp to MQA */
+       memset(&rsp_evt, 0, sizeof(MQSV_EVT));
+
+       rsp_evt.type = MQSV_EVT_MQP_RSP;
+       rsp_evt.msg.mqp_rsp.type = MQP_EVT_CAP_GET_RSP;
+       rsp_evt.msg.mqp_rsp.error = err;
+       rsp_evt.msg.mqp_rsp.info.capacity.thresholds = qnode->qinfo.thresholds;
+
+       rc = mqnd_mds_send_rsp(cb, &evt->sinfo, &rsp_evt);
+
+       if (rc != NCSCC_RC_SUCCESS)
+               LOG_ER(
+                   "Queue Capacity get :Mds Send Response Failed %" PRIx64,
+                   evt->sinfo.dest);
+       else
+               TRACE_1("Queue Capacity get: Mds Send Response Success");
+
+       TRACE_LEAVE2("Returned with return code %u", rc);
+       return rc;
+}
+
+/****************************************************************************
  * Name          : mqnd_evt_proc_cb_dump
  *
  * Description   : Function to print the control block infomration
diff --git a/src/msg/msgnd/mqnd_imm.c b/src/msg/msgnd/mqnd_imm.c
index 349790d..feb4aa4 100644
--- a/src/msg/msgnd/mqnd_imm.c
+++ b/src/msg/msgnd/mqnd_imm.c
@@ -460,7 +460,6 @@ SaAisErrorT 
mqnd_create_runtime_MsgQPriorityobject(SaStringT rname,
                                                   SaImmOiHandleT immOiHandle)
 {
        SaAisErrorT rc = SA_AIS_OK;
-       SaUint64T def_val = 0;
        int i = 0;
        SaImmAttrValueT arr1[1], arr2[1], arr3[1], arr4[1];
        SaImmAttrValuesT_2 attr_mqprio, attr_mqprioSize, attr_capavail,
@@ -484,8 +483,8 @@ SaAisErrorT 
mqnd_create_runtime_MsgQPriorityobject(SaStringT rname,
 
                arr1[0] = &mqprdn;
                arr2[0] = &(qnode->qinfo.size[i]);
-               arr3[0] = &def_val; /* not implemented */
-               arr4[0] = &def_val; /* not implemented */
+               arr3[0] = &qnode->qinfo.thresholds.capacityAvailable[i];
+               arr4[0] = &qnode->qinfo.thresholds.capacityReached[i];
 
                attr_mqprio.attrName = "safMqPrio";
                attr_mqprio.attrValueType = SA_IMM_ATTR_SASTRINGT;
diff --git a/src/msg/msgnd/mqnd_util.c b/src/msg/msgnd/mqnd_util.c
index 8dd79fb..10c78f5 100644
--- a/src/msg/msgnd/mqnd_util.c
+++ b/src/msg/msgnd/mqnd_util.c
@@ -79,6 +79,10 @@ uint32_t mqnd_queue_create(MQND_CB *cb, MQP_OPEN_REQ *open, 
MDS_DEST *rcvr_mqa,
             i <= SA_MSG_MESSAGE_LOWEST_PRIORITY; i++) {
                qnode->qinfo.totalQueueSize += open->creationAttributes.size[i];
                qnode->qinfo.size[i] = open->creationAttributes.size[i];
+               qnode->qinfo.thresholds.capacityReached[i] =
+                       qnode->qinfo.size[i];
+               qnode->qinfo.thresholds.capacityAvailable[i] =
+                       qnode->qinfo.size[i];
                qnode->qinfo.queueStatus.saMsgQueueUsage[i].queueSize =
                    open->creationAttributes.size[i];
        }
-- 
2.9.5


------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, Slashdot.org! http://sdm.link/slashdot
_______________________________________________
Opensaf-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to