src/amf/amfd/chkop.cc    |   8 +++++++-
 src/amf/amfd/ckpt.h      |   3 ++-
 src/amf/amfd/ckpt_dec.cc |  15 ++++++++++++++-
 src/amf/amfd/ckpt_enc.cc |  13 ++++++++++++-
 src/amf/amfd/ckpt_msg.h  |   1 +
 src/amf/amfd/imm.cc      |  31 ++++++++++---------------------
 src/amf/amfd/imm.h       |   1 +
 src/amf/amfd/main.cc     |  10 +++++++++-
 8 files changed, 56 insertions(+), 26 deletions(-)


Standby AMFD keeps 500 jobs related to IMM updates in its queue. Whenever this 
size
limit increases, job queue is trimmed to 250.

Patch removes this limitation. Now active AMFD will ask standby AMFD through 
MBCSv
checkpointing to flush its job queue when a) active AMFD finishes all IMM 
runtime updates
and its job queue becomes empty and b) when COLD sync state of standy changes 
from
OUT_OF_SYNC to IN_SYNC when standby joins cluster first time.

diff --git a/src/amf/amfd/chkop.cc b/src/amf/amfd/chkop.cc
--- a/src/amf/amfd/chkop.cc
+++ b/src/amf/amfd/chkop.cc
@@ -1051,7 +1051,13 @@ uint32_t avsv_send_ckpt_data(AVD_CL_CB *
                }
                cb->async_updt_cnt.ng_updt++;
                break;
-
+       case AVSV_CKPT_AVD_IMM_JOB_QUEUE_STATUS:
+               if ((avd_cb->other_avd_adest != 0) && (avd_cb->avd_peer_ver < 
AVD_MBCSV_SUB_PART_VERSION_8)) {
+                       TRACE("No ckpt for  AVSV_CKPT_AVD_IMM_JOB_QUEUE_STATUS 
as peer AMFD has"
+                                       " lower version:%d", 
avd_cb->avd_peer_ver);
+                       return NCSCC_RC_SUCCESS;
+               }
+               break;
        default:
                return NCSCC_RC_SUCCESS;
        }
diff --git a/src/amf/amfd/ckpt.h b/src/amf/amfd/ckpt.h
--- a/src/amf/amfd/ckpt.h
+++ b/src/amf/amfd/ckpt.h
@@ -34,9 +34,10 @@
 #define AMF_AMFD_CKPT_H_
 
 // current version
-#define AVD_MBCSV_SUB_PART_VERSION      7
+#define AVD_MBCSV_SUB_PART_VERSION      8
 
 // supported versions
+#define AVD_MBCSV_SUB_PART_VERSION_8    8
 #define AVD_MBCSV_SUB_PART_VERSION_7    7
 #define AVD_MBCSV_SUB_PART_VERSION_6    6
 #define AVD_MBCSV_SUB_PART_VERSION_5    5
diff --git a/src/amf/amfd/ckpt_dec.cc b/src/amf/amfd/ckpt_dec.cc
--- a/src/amf/amfd/ckpt_dec.cc
+++ b/src/amf/amfd/ckpt_dec.cc
@@ -100,6 +100,7 @@ static uint32_t dec_cs_siass(AVD_CL_CB *
 static uint32_t dec_cs_si_trans(AVD_CL_CB *cb, NCS_MBCSV_CB_DEC *dec, uint32_t 
num_of_obj);
 static uint32_t dec_cs_async_updt_cnt(AVD_CL_CB *cb, NCS_MBCSV_CB_DEC *dec, 
uint32_t num_of_obj);
 static uint32_t dec_cs_comp_cs_type_config(AVD_CL_CB *cb, NCS_MBCSV_CB_DEC 
*dec, uint32_t num_of_obj);
+static uint32_t dec_avd_to_avd_job_queue_status(AVD_CL_CB *cb, 
NCS_MBCSV_CB_DEC *dec);
 
 /*
  * Function list for decoding the async data.
@@ -175,7 +176,9 @@ const AVSV_DECODE_CKPT_DATA_FUNC_PTR avd
        nullptr,                        /* AVSV_SYNC_COMMIT */
        dec_su_restart_count,
        dec_si_dep_state,
-       dec_ng_admin_state
+       dec_ng_admin_state,
+       dec_avd_to_avd_job_queue_status
+
 };
 
 /*
@@ -2972,3 +2975,13 @@ static uint32_t dec_ng_admin_state(AVD_C
        return NCSCC_RC_SUCCESS;
 }
 
+static uint32_t dec_avd_to_avd_job_queue_status(AVD_CL_CB *cb, 
NCS_MBCSV_CB_DEC *dec) {
+  TRACE_ENTER();
+  uint32_t size = 1;
+  osaf_decode_uint32(&dec->i_uba, &size);
+  Fifo::trim_to_size(size);
+  TRACE_LEAVE();
+  return NCSCC_RC_SUCCESS;
+}
+
+
diff --git a/src/amf/amfd/ckpt_enc.cc b/src/amf/amfd/ckpt_enc.cc
--- a/src/amf/amfd/ckpt_enc.cc
+++ b/src/amf/amfd/ckpt_enc.cc
@@ -99,6 +99,7 @@ static uint32_t enc_cs_siass(AVD_CL_CB *
 static uint32_t enc_cs_si_trans(AVD_CL_CB *cb, NCS_MBCSV_CB_ENC *enc, uint32_t 
*num_of_obj);
 static uint32_t enc_cs_async_updt_cnt(AVD_CL_CB *cb, NCS_MBCSV_CB_ENC *enc, 
uint32_t *num_of_obj);
 static uint32_t enc_cs_comp_cs_type_config(AVD_CL_CB *cb, NCS_MBCSV_CB_ENC 
*enc, uint32_t *num_of_obj);
+static uint32_t enc_avd_to_avd_job_queue_status(AVD_CL_CB *cb, 
NCS_MBCSV_CB_ENC *enc);
 
 static uint32_t enc_su_oper_list(AVD_CL_CB *cb, AVD_SG *sg, NCS_MBCSV_CB_ENC 
*enc);
 /*
@@ -175,7 +176,8 @@ const AVSV_ENCODE_CKPT_DATA_FUNC_PTR avd
        nullptr,                        /* AVSV_SYNC_COMMIT */
        enc_su_restart_count,
        enc_si_dep_state,
-       enc_ng_admin_state
+       enc_ng_admin_state,
+       enc_avd_to_avd_job_queue_status
 };
 
 /*
@@ -2493,4 +2495,13 @@ static uint32_t enc_ng_admin_state(AVD_C
         return NCSCC_RC_SUCCESS;
 }
 
+static uint32_t enc_avd_to_avd_job_queue_status(AVD_CL_CB *cb, 
NCS_MBCSV_CB_ENC *enc) {
+  TRACE_ENTER();
+  osafassert(NCS_MBCSV_ACT_UPDATE == enc->io_action);
+  const uint32_t *size = reinterpret_cast<uint32_t*>(enc->io_reo_hdl);
+  osaf_encode_uint32(&enc->io_uba, *size);
+  TRACE_LEAVE();
+  return NCSCC_RC_SUCCESS;
+}
 
+
diff --git a/src/amf/amfd/ckpt_msg.h b/src/amf/amfd/ckpt_msg.h
--- a/src/amf/amfd/ckpt_msg.h
+++ b/src/amf/amfd/ckpt_msg.h
@@ -111,6 +111,7 @@ typedef enum avsv_ckpt_msg_reo_type {
        AVSV_CKPT_SU_RESTART_COUNT,
        AVSV_CKPT_SI_DEP_STATE,
        AVSV_CKPT_NG_ADMIN_STATE,
+       AVSV_CKPT_AVD_IMM_JOB_QUEUE_STATUS,
        AVSV_CKPT_MSG_MAX
 } AVSV_CKPT_MSG_REO_TYPE;
 
diff --git a/src/amf/amfd/imm.cc b/src/amf/amfd/imm.cc
--- a/src/amf/amfd/imm.cc
+++ b/src/amf/amfd/imm.cc
@@ -126,7 +126,6 @@ static char *StrDup(const char *s)
        std::strcpy(c,s);
        return c;
 }
-uint32_t const MAX_JOB_SIZE_AT_STANDBY = 500;
 
 //
 Job::~Job()
@@ -381,24 +380,11 @@ Job* Fifo::dequeue()
        
        return tmp;
 }
-
-/**
- * @brief   As of now standby AMFD will maintain immjobs for object of few 
classes.
- *          Flush all the jobs without updating to imm if 
MAX_JOB_SIZE_AT_STANDBY is 
- *         reached. 
- *
- */
-void check_and_flush_job_queue_standby_amfd(void)
-{
-       TRACE_ENTER();
-
-       if (Fifo::size() >= MAX_JOB_SIZE_AT_STANDBY) {
-               const uint32_t new_size = MAX_JOB_SIZE_AT_STANDBY / 2;
-               LOG_WA("Reducing job queue of size:%u to 
%u",Fifo::size(),new_size);
-               Fifo::trim_to_size(new_size);
-       }
-
-       TRACE_LEAVE();
+void ckpt_job_queue_size() {
+  uint32_t size = Fifo::size();
+  TRACE_ENTER2("Fifo size:%u",size);
+  m_AVSV_SEND_CKPT_UPDT_ASYNC_UPDT(avd_cb, &size, 
AVSV_CKPT_AVD_IMM_JOB_QUEUE_STATUS);
+  TRACE_LEAVE();
 }
 
 //
@@ -412,7 +398,6 @@ AvdJobDequeueResultT Fifo::execute(const
                return JOB_ETRYAGAIN;
 
        if ((!avd_cb->is_implementer) && (avd_cb->avail_state_avd == 
SA_AMF_HA_STANDBY)) {
-               check_and_flush_job_queue_standby_amfd();
                return JOB_EINVH;
        }
 
@@ -426,6 +411,10 @@ AvdJobDequeueResultT Fifo::execute(const
        TRACE_ENTER();
 
        ret = ajob->exec(cb);
+
+       //If no jobs then send a ckpt update to standby to flush its job queue.
+       if (((ajob = peek()) == nullptr) && (cb->stby_sync_state == 
AVD_STBY_IN_SYNC))
+               ckpt_job_queue_size();
        
        TRACE_LEAVE2("%d", ret);
 
@@ -477,7 +466,7 @@ void Fifo::trim_to_size(const uint32_t s
 {
        Job *ajob;
 
-       TRACE_ENTER();
+       TRACE_ENTER2("My Fifo size:%lu, trim to size:%u", job_.size(), size);
 
        while (job_.size() > size && (ajob = dequeue()) != nullptr) {
                delete ajob;
diff --git a/src/amf/amfd/imm.h b/src/amf/amfd/imm.h
--- a/src/amf/amfd/imm.h
+++ b/src/amf/amfd/imm.h
@@ -208,5 +208,6 @@ void report_admin_op_error(SaImmOiHandle
                struct admin_oper_cbk *pend_cbk,
                const char *format, ...) __attribute__ ((format(printf, 5, 6)));
 extern void check_and_flush_job_queue_standby_amfd(void);
+void ckpt_job_queue_size();
 
 #endif  // AMF_AMFD_IMM_H_
diff --git a/src/amf/amfd/main.cc b/src/amf/amfd/main.cc
--- a/src/amf/amfd/main.cc
+++ b/src/amf/amfd/main.cc
@@ -621,6 +621,7 @@ static void main_loop(void)
        AVD_EVT *evt;
        NCS_SEL_OBJ mbx_fd;
        SaAisErrorT error = SA_AIS_OK;
+       AVD_STBY_SYNC_STATE old_sync_state = cb->stby_sync_state;
        int polltmo = -1;
        int term_fd;
 
@@ -703,6 +704,7 @@ static void main_loop(void)
                }
 
                if (fds[FD_MBCSV].revents & POLLIN) {
+                       old_sync_state = cb->stby_sync_state;
                        if (NCSCC_RC_SUCCESS != avsv_mbcsv_dispatch(cb, 
SA_DISPATCH_ALL))
                                LOG_ER("avsv_mbcsv_dispatch FAILED");
                }
@@ -750,7 +752,13 @@ static void main_loop(void)
                                avd_d2n_msg_dequeue(cb);
                        }
                }
-
+               
+               // Standby COLD_SYNC completed. Ask STANDBY to flush its job 
queue if size is 0 on active.
+               if ((Fifo::size() == 0) && (old_sync_state == 
AVD_STBY_OUT_OF_SYNC)
+                       && (cb->stby_sync_state == AVD_STBY_IN_SYNC)) {
+                       ckpt_job_queue_size();
+                       old_sync_state = cb->stby_sync_state;
+               }
                // submit some jobs (if any)
                polltmo = retval_to_polltmo(Fifo::execute(cb));
        }

------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, SlashDot.org! http://sdm.link/slashdot
_______________________________________________
Opensaf-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to