When ntfd receives an asynchronous checkpoint from its peer,
it decodes and process the checkpoint before sending response
to peer. If processing checkpoint take a long time, it will
hang the active ntf. Solution is to push the checkpoint into
a queue then process it later.
---
src/ntf/ntfd/ntfs_cb.h | 2 ++
src/ntf/ntfd/ntfs_evt.c | 48 +++++++++++++++++++++++++++++++++++++++
src/ntf/ntfd/ntfs_evt.h | 1 +
src/ntf/ntfd/ntfs_mbcsv.c | 40 ++++++++++++++++++++++++--------
src/ntf/ntfd/ntfs_mbcsv.h | 2 ++
5 files changed, 84 insertions(+), 9 deletions(-)
diff --git a/src/ntf/ntfd/ntfs_cb.h b/src/ntf/ntfd/ntfs_cb.h
index 96eedc171..c21581a65 100644
--- a/src/ntf/ntfd/ntfs_cb.h
+++ b/src/ntf/ntfd/ntfs_cb.h
@@ -18,6 +18,7 @@
#ifndef NTF_NTFD_NTFS_CB_H_
#define NTF_NTFD_NTFS_CB_H_
+#include "base/ncs_queue.h"
#include <stdbool.h>
#include <saNtf.h>
#include <saClm.h>
@@ -71,6 +72,7 @@ typedef struct ntfs_cb {
NCS_SEL_OBJ usr2_sel_obj; /* Selection object for CLM initialization.*/
uint16_t peer_mbcsv_version; /*Remeber peer NTFS MBCSV version.*/
bool clm_initialized; // For CLM init status;
+ NCS_QUEUE async_ckpt_queue;
} ntfs_cb_t;
extern uint32_t ntfs_cb_init(ntfs_cb_t *);
diff --git a/src/ntf/ntfd/ntfs_evt.c b/src/ntf/ntfd/ntfs_evt.c
index 8c73f430d..1162eec78 100644
--- a/src/ntf/ntfd/ntfs_evt.c
+++ b/src/ntf/ntfd/ntfs_evt.c
@@ -36,6 +36,7 @@
static uint32_t process_api_evt(ntfsv_ntfs_evt_t *evt);
static uint32_t proc_ntfa_updn_mds_msg(ntfsv_ntfs_evt_t *evt);
static uint32_t proc_mds_quiesced_ack_msg(ntfsv_ntfs_evt_t *evt);
+static uint32_t process_async_ckpt_evt();
static uint32_t proc_initialize_msg(ntfs_cb_t *, ntfsv_ntfs_evt_t *evt);
static uint32_t proc_finalize_msg(ntfs_cb_t *, ntfsv_ntfs_evt_t *evt);
static uint32_t proc_subscribe_msg(ntfs_cb_t *, ntfsv_ntfs_evt_t *evt);
@@ -163,6 +164,36 @@ static uint32_t proc_mds_quiesced_ack_msg(ntfsv_ntfs_evt_t
*evt)
return NCSCC_RC_SUCCESS;
}
+/****************************************************************************
+ * Name : process_async_ckpt_evt
+ *
+ * Description : Process an async checkpoint that ntfs received from its peer
+ *
+ * Return Values : NCSCC_RC_SUCCESS/NCSCC_RC_FAILURE
+ *
+ * Notes : None.
+ *****************************************************************************/
+static uint32_t process_async_ckpt_evt()
+{
+ uint32_t rc = NCSCC_RC_SUCCESS;
+ TRACE_ENTER();
+ if (ntfs_cb->ha_state == SA_AMF_HA_ACTIVE) {
+ TRACE("Unexpectly received a checkpoint event while active."
+ " Skipped");
+ } else if (ntfs_cb->ha_state == SA_AMF_HA_STANDBY) {
+ ntfsv_ckpt_msg_t *ckpt_msg = ncs_dequeue(
+ &ntfs_cb->async_ckpt_queue);
+ if(ckpt_msg != NULL) {
+ rc = ntfs_mbcsv_process_ckpt_data(ntfs_cb, ckpt_msg);
+ /* Update the Async Update Count at standby */
+ ntfs_cb->async_upd_cnt++;
+ free(ckpt_msg);
+ }
+ }
+ TRACE_LEAVE();
+ return rc;
+}
+
/****************************************************************************
* Name : proc_rda_cb_msg
*
@@ -191,6 +222,20 @@ static uint32_t proc_rda_cb_msg(ntfsv_ntfs_evt_t *evt)
ntfs_cb->ha_state != SA_AMF_HA_ACTIVE) {
SaAmfHAStateT old_ha_state = ntfs_cb->ha_state;
LOG_NO("ACTIVE request");
+ if (old_ha_state == SA_AMF_HA_STANDBY) {
+ // Process pending async checkpoint if any
+ ntfsv_ckpt_msg_t *ckpt_msg = NULL;
+ TRACE("Process pending async checkpoint");
+ while ((ckpt_msg = ncs_dequeue(
+ &ntfs_cb->async_ckpt_queue))
+ != NULL) {
+ ntfs_mbcsv_process_ckpt_data(ntfs_cb,
+ ckpt_msg);
+ /* Update the Async Update Count at standby */
+ ntfs_cb->async_upd_cnt++;
+ free(ckpt_msg);
+ }
+ }
ntfs_cb->mds_role = V_DEST_RL_ACTIVE;
if ((rc = ntfs_mds_change_role()) != NCSCC_RC_SUCCESS) {
@@ -247,6 +292,7 @@ uint32_t ntfs_cb_init(ntfs_cb_t *ntfs_cb)
ntfs_cb->clm_hdl = 0;
ntfs_cb->clm_initialized = false;
ntfs_cb->clmSelectionObject = -1;
+ ncs_create_queue(&ntfs_cb->async_ckpt_queue);
tmp = (char *)getenv("NTFSV_ENV_CACHE_SIZE");
if (tmp) {
@@ -730,6 +776,8 @@ void ntfs_process_mbx(SYSF_MBX *mbx)
}
if (msg->evt_type == NTFSV_EVT_RDA) {
proc_rda_cb_msg(msg);
+ } else if (msg->evt_type == NTFSV_EVT_ASYNC_CKPT) {
+ process_async_ckpt_evt();
}
}
diff --git a/src/ntf/ntfd/ntfs_evt.h b/src/ntf/ntfd/ntfs_evt.h
index bc7495932..6fa845aa1 100644
--- a/src/ntf/ntfd/ntfs_evt.h
+++ b/src/ntf/ntfd/ntfs_evt.h
@@ -26,6 +26,7 @@ typedef enum ntfsv_ntfs_evt_type {
NTFSV_NTFS_EVT_NTFA_DOWN = 3,
NTFSV_EVT_QUIESCED_ACK = 4,
NTFSV_EVT_RDA = 5,
+ NTFSV_EVT_ASYNC_CKPT = 6,
NTFSV_NTFS_EVT_MAX
} NTFSV_NTFS_EVT_TYPE;
diff --git a/src/ntf/ntfd/ntfs_mbcsv.c b/src/ntf/ntfd/ntfs_mbcsv.c
index ed4c90c6d..0781e02b5 100644
--- a/src/ntf/ntfd/ntfs_mbcsv.c
+++ b/src/ntf/ntfd/ntfs_mbcsv.c
@@ -87,7 +87,6 @@ static uint32_t ckpt_peer_info_cbk_handler(NCS_MBCSV_CB_ARG
*arg);
static uint32_t ckpt_notify_cbk_handler(NCS_MBCSV_CB_ARG *arg);
static uint32_t ckpt_err_ind_cbk_handler(NCS_MBCSV_CB_ARG *arg);
-static uint32_t process_ckpt_data(ntfs_cb_t *cb, ntfsv_ckpt_msg_t *data);
static void ntfs_exit(const char *msg, SaAmfRecommendedRecoveryT rec_rcvr);
static NTFS_CKPT_HDLR ckpt_data_handler[NTFS_CKPT_MSG_MAX] = {
@@ -967,12 +966,35 @@ static uint32_t ckpt_decode_async_update(ntfs_cb_t *cb,
break;
} /*end switch */
if (rc == NCSCC_RC_SUCCESS) {
- rc = process_ckpt_data(cb, ckpt_msg);
- /* Update the Async Update Count at standby */
- cb->async_upd_cnt++;
+ // Allocate a mailbox message
+ ntfsv_ntfs_evt_t *mbx_evt = calloc(1,
+ sizeof(ntfsv_ntfs_evt_t));
+ if (!mbx_evt) {
+ rc = NCSCC_RC_FAILURE;
+ TRACE("Failed to allocate memory for mailbox event");
+ goto done;
+ }
+ // Put checkpoint to queue. The checkpoint will be freed later
+ // after processing it.
+ ncs_enqueue(&cb->async_ckpt_queue, (void *)ckpt_msg);
+ ckpt_msg = NULL;
+ // Send an async checkpoint event to mailbox. Don't process
+ // it here because the active ntf is waiting for the response.
+ // ntf will check the async checkpoint queue and
+ // process it after receive this event in mailbox.
+ mbx_evt->evt_type = NTFSV_EVT_ASYNC_CKPT;
+ rc = ncs_ipc_send(&ntfs_cb->mbx, (NCS_IPC_MSG *) mbx_evt,
+ NCS_IPC_PRIORITY_NORMAL);
+ if (rc != NCSCC_RC_SUCCESS) {
+ LOG_ER("IPC send failed %d", rc);
+ free(mbx_evt);
+ goto done;
+ }
}
done:
- free(ckpt_msg);
+ if (ckpt_msg) {
+ free(ckpt_msg);
+ }
TRACE_LEAVE();
return rc;
/* if failure, should an indication be sent to active ? */
@@ -1114,7 +1136,7 @@ static uint32_t ckpt_decode_cold_sync(ntfs_cb_t *cb,
NCS_MBCSV_CB_ARG *cbk_arg)
goto done;
}
/* Update our database */
- rc = process_ckpt_data(cb, data);
+ rc = ntfs_mbcsv_process_ckpt_data(cb, data);
if (rc != NCSCC_RC_SUCCESS) {
goto done;
}
@@ -1324,7 +1346,7 @@ done:
}
/****************************************************************************
- * Name : process_ckpt_data
+ * Name : ntfs_mbcsv_process_ckpt_data
*
* Description : This function updates the ntfs internal databases
* based on the data type.
@@ -1338,7 +1360,7 @@ done:
* Notes : None.
*****************************************************************************/
-static uint32_t process_ckpt_data(ntfs_cb_t *cb, ntfsv_ckpt_msg_t *data)
+uint32_t ntfs_mbcsv_process_ckpt_data(ntfs_cb_t *cb, ntfsv_ckpt_msg_t *data)
{
uint32_t rc = NCSCC_RC_SUCCESS;
if ((!cb) || (data == NULL)) {
@@ -1359,7 +1381,7 @@ static uint32_t process_ckpt_data(ntfs_cb_t *cb,
ntfsv_ckpt_msg_t *data)
} else {
return (rc = NCSCC_RC_FAILURE);
}
-} /*End ntfs_process_ckpt_data() */
+} /*End ntfs_mbcsv_process_ckpt_data() */
/****************************************************************************
* Name : ckpt_proc_reg_rec
diff --git a/src/ntf/ntfd/ntfs_mbcsv.h b/src/ntf/ntfd/ntfs_mbcsv.h
index 3286ec855..1a1f5c403 100644
--- a/src/ntf/ntfd/ntfs_mbcsv.h
+++ b/src/ntf/ntfd/ntfs_mbcsv.h
@@ -103,6 +103,7 @@ typedef struct {
} ntfsv_ckpt_header_t;
typedef struct {
+ NCS_QELEM qelem;
ntfsv_ckpt_header_t header;
union {
ntfs_ckpt_reg_msg_t reg_rec;
@@ -130,5 +131,6 @@ void update_standby(ntfsv_ckpt_msg_t *ckpt, uint32_t
action);
uint32_t enc_ckpt_reserv_header(NCS_UBAID *uba, ntfsv_ckpt_msg_type_t type,
uint32_t num_rec, uint32_t len);
uint32_t enc_mbcsv_client_msg(NCS_UBAID *uba, ntfs_ckpt_reg_msg_t *param);
+uint32_t ntfs_mbcsv_process_ckpt_data(ntfs_cb_t *cb, ntfsv_ckpt_msg_t *data);
#endif // NTF_NTFD_NTFS_MBCSV_H_
--
2.25.1
_______________________________________________
Opensaf-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/opensaf-devel