ack, code review and some "smoke tests" run. Minor comment below.
/Thanks HansN On 01/30/2017 12:53 PM, [email protected] wrote: > src/amf/amfd/chkop.cc | 8 +++++++- > src/amf/amfd/ckpt.h | 3 ++- > src/amf/amfd/ckpt_dec.cc | 15 ++++++++++++++- > src/amf/amfd/ckpt_enc.cc | 13 ++++++++++++- > src/amf/amfd/ckpt_msg.h | 1 + > src/amf/amfd/imm.cc | 31 ++++++++++--------------------- > src/amf/amfd/imm.h | 1 + > src/amf/amfd/main.cc | 10 +++++++++- > 8 files changed, 56 insertions(+), 26 deletions(-) > > > Standby AMFD keeps 500 jobs related to IMM updates in its queue. Whenever > this size > limit increases, job queue is trimmed to 250. > > Patch removes this limitation. Now active AMFD will ask standby AMFD through > MBCSv > checkpointing to flush its job queue when a) active AMFD finishes all IMM > runtime updates > and its job queue becomes empty and b) when COLD sync state of standy changes > from > OUT_OF_SYNC to IN_SYNC when standby joins cluster first time. > > diff --git a/src/amf/amfd/chkop.cc b/src/amf/amfd/chkop.cc > --- a/src/amf/amfd/chkop.cc > +++ b/src/amf/amfd/chkop.cc > @@ -1051,7 +1051,13 @@ uint32_t avsv_send_ckpt_data(AVD_CL_CB * > } > cb->async_updt_cnt.ng_updt++; > break; > - > + case AVSV_CKPT_AVD_IMM_JOB_QUEUE_STATUS: > + if ((avd_cb->other_avd_adest != 0) && (avd_cb->avd_peer_ver < > AVD_MBCSV_SUB_PART_VERSION_8)) { > + TRACE("No ckpt for AVSV_CKPT_AVD_IMM_JOB_QUEUE_STATUS > as peer AMFD has" > + " lower version:%d", > avd_cb->avd_peer_ver); > + return NCSCC_RC_SUCCESS; > + } > + break; > default: > return NCSCC_RC_SUCCESS; > } > diff --git a/src/amf/amfd/ckpt.h b/src/amf/amfd/ckpt.h > --- a/src/amf/amfd/ckpt.h > +++ b/src/amf/amfd/ckpt.h > @@ -34,9 +34,10 @@ > #define AMF_AMFD_CKPT_H_ > > // current version > -#define AVD_MBCSV_SUB_PART_VERSION 7 > +#define AVD_MBCSV_SUB_PART_VERSION 8 > > // supported versions > +#define AVD_MBCSV_SUB_PART_VERSION_8 8 > #define AVD_MBCSV_SUB_PART_VERSION_7 7 > #define AVD_MBCSV_SUB_PART_VERSION_6 6 > #define AVD_MBCSV_SUB_PART_VERSION_5 5 > diff --git a/src/amf/amfd/ckpt_dec.cc b/src/amf/amfd/ckpt_dec.cc > --- a/src/amf/amfd/ckpt_dec.cc > +++ b/src/amf/amfd/ckpt_dec.cc > @@ -100,6 +100,7 @@ static uint32_t dec_cs_siass(AVD_CL_CB * > static uint32_t dec_cs_si_trans(AVD_CL_CB *cb, NCS_MBCSV_CB_DEC *dec, > uint32_t num_of_obj); > static uint32_t dec_cs_async_updt_cnt(AVD_CL_CB *cb, NCS_MBCSV_CB_DEC *dec, > uint32_t num_of_obj); > static uint32_t dec_cs_comp_cs_type_config(AVD_CL_CB *cb, NCS_MBCSV_CB_DEC > *dec, uint32_t num_of_obj); > +static uint32_t dec_avd_to_avd_job_queue_status(AVD_CL_CB *cb, > NCS_MBCSV_CB_DEC *dec); > > /* > * Function list for decoding the async data. > @@ -175,7 +176,9 @@ const AVSV_DECODE_CKPT_DATA_FUNC_PTR avd > nullptr, /* AVSV_SYNC_COMMIT */ > dec_su_restart_count, > dec_si_dep_state, > - dec_ng_admin_state > + dec_ng_admin_state, > + dec_avd_to_avd_job_queue_status > + > }; > > /* > @@ -2972,3 +2975,13 @@ static uint32_t dec_ng_admin_state(AVD_C > return NCSCC_RC_SUCCESS; > } > > +static uint32_t dec_avd_to_avd_job_queue_status(AVD_CL_CB *cb, > NCS_MBCSV_CB_DEC *dec) { > + TRACE_ENTER(); > + uint32_t size = 1; > + osaf_decode_uint32(&dec->i_uba, &size); > + Fifo::trim_to_size(size); > + TRACE_LEAVE(); > + return NCSCC_RC_SUCCESS; > +} > + > + > diff --git a/src/amf/amfd/ckpt_enc.cc b/src/amf/amfd/ckpt_enc.cc > --- a/src/amf/amfd/ckpt_enc.cc > +++ b/src/amf/amfd/ckpt_enc.cc > @@ -99,6 +99,7 @@ static uint32_t enc_cs_siass(AVD_CL_CB * > static uint32_t enc_cs_si_trans(AVD_CL_CB *cb, NCS_MBCSV_CB_ENC *enc, > uint32_t *num_of_obj); > static uint32_t enc_cs_async_updt_cnt(AVD_CL_CB *cb, NCS_MBCSV_CB_ENC *enc, > uint32_t *num_of_obj); > static uint32_t enc_cs_comp_cs_type_config(AVD_CL_CB *cb, NCS_MBCSV_CB_ENC > *enc, uint32_t *num_of_obj); > +static uint32_t enc_avd_to_avd_job_queue_status(AVD_CL_CB *cb, > NCS_MBCSV_CB_ENC *enc); > > static uint32_t enc_su_oper_list(AVD_CL_CB *cb, AVD_SG *sg, > NCS_MBCSV_CB_ENC *enc); > /* > @@ -175,7 +176,8 @@ const AVSV_ENCODE_CKPT_DATA_FUNC_PTR avd > nullptr, /* AVSV_SYNC_COMMIT */ > enc_su_restart_count, > enc_si_dep_state, > - enc_ng_admin_state > + enc_ng_admin_state, > + enc_avd_to_avd_job_queue_status > }; > > /* > @@ -2493,4 +2495,13 @@ static uint32_t enc_ng_admin_state(AVD_C > return NCSCC_RC_SUCCESS; > } > > +static uint32_t enc_avd_to_avd_job_queue_status(AVD_CL_CB *cb, > NCS_MBCSV_CB_ENC *enc) { > + TRACE_ENTER(); > + osafassert(NCS_MBCSV_ACT_UPDATE == enc->io_action); > + const uint32_t *size = reinterpret_cast<uint32_t*>(enc->io_reo_hdl); > + osaf_encode_uint32(&enc->io_uba, *size); > + TRACE_LEAVE(); > + return NCSCC_RC_SUCCESS; > +} > > + > diff --git a/src/amf/amfd/ckpt_msg.h b/src/amf/amfd/ckpt_msg.h > --- a/src/amf/amfd/ckpt_msg.h > +++ b/src/amf/amfd/ckpt_msg.h > @@ -111,6 +111,7 @@ typedef enum avsv_ckpt_msg_reo_type { > AVSV_CKPT_SU_RESTART_COUNT, > AVSV_CKPT_SI_DEP_STATE, > AVSV_CKPT_NG_ADMIN_STATE, > + AVSV_CKPT_AVD_IMM_JOB_QUEUE_STATUS, > AVSV_CKPT_MSG_MAX > } AVSV_CKPT_MSG_REO_TYPE; > > diff --git a/src/amf/amfd/imm.cc b/src/amf/amfd/imm.cc > --- a/src/amf/amfd/imm.cc > +++ b/src/amf/amfd/imm.cc > @@ -126,7 +126,6 @@ static char *StrDup(const char *s) > std::strcpy(c,s); > return c; > } > -uint32_t const MAX_JOB_SIZE_AT_STANDBY = 500; > > // > Job::~Job() > @@ -381,24 +380,11 @@ Job* Fifo::dequeue() > > return tmp; > } > - > -/** > - * @brief As of now standby AMFD will maintain immjobs for object of few > classes. > - * Flush all the jobs without updating to imm if > MAX_JOB_SIZE_AT_STANDBY is > - * reached. > - * > - */ > -void check_and_flush_job_queue_standby_amfd(void) > -{ > - TRACE_ENTER(); > - > - if (Fifo::size() >= MAX_JOB_SIZE_AT_STANDBY) { > - const uint32_t new_size = MAX_JOB_SIZE_AT_STANDBY / 2; > - LOG_WA("Reducing job queue of size:%u to > %u",Fifo::size(),new_size); > - Fifo::trim_to_size(new_size); > - } > - > - TRACE_LEAVE(); > +void ckpt_job_queue_size() { > + uint32_t size = Fifo::size(); [HansN] space missing befor size > + TRACE_ENTER2("Fifo size:%u",size); > + m_AVSV_SEND_CKPT_UPDT_ASYNC_UPDT(avd_cb, &size, > AVSV_CKPT_AVD_IMM_JOB_QUEUE_STATUS); > + TRACE_LEAVE(); > } > > // > @@ -412,7 +398,6 @@ AvdJobDequeueResultT Fifo::execute(const > return JOB_ETRYAGAIN; > > if ((!avd_cb->is_implementer) && (avd_cb->avail_state_avd == > SA_AMF_HA_STANDBY)) { > - check_and_flush_job_queue_standby_amfd(); > return JOB_EINVH; > } > > @@ -426,6 +411,10 @@ AvdJobDequeueResultT Fifo::execute(const > TRACE_ENTER(); > > ret = ajob->exec(cb); > + > + //If no jobs then send a ckpt update to standby to flush its job queue. > + if (((ajob = peek()) == nullptr) && (cb->stby_sync_state == > AVD_STBY_IN_SYNC)) > + ckpt_job_queue_size(); > > TRACE_LEAVE2("%d", ret); > > @@ -477,7 +466,7 @@ void Fifo::trim_to_size(const uint32_t s > { > Job *ajob; > > - TRACE_ENTER(); > + TRACE_ENTER2("My Fifo size:%lu, trim to size:%u", job_.size(), size); > > while (job_.size() > size && (ajob = dequeue()) != nullptr) { > delete ajob; > diff --git a/src/amf/amfd/imm.h b/src/amf/amfd/imm.h > --- a/src/amf/amfd/imm.h > +++ b/src/amf/amfd/imm.h > @@ -208,5 +208,6 @@ void report_admin_op_error(SaImmOiHandle > struct admin_oper_cbk *pend_cbk, > const char *format, ...) __attribute__ ((format(printf, 5, 6))); > extern void check_and_flush_job_queue_standby_amfd(void); > +void ckpt_job_queue_size(); > > #endif // AMF_AMFD_IMM_H_ > diff --git a/src/amf/amfd/main.cc b/src/amf/amfd/main.cc > --- a/src/amf/amfd/main.cc > +++ b/src/amf/amfd/main.cc > @@ -621,6 +621,7 @@ static void main_loop(void) > AVD_EVT *evt; > NCS_SEL_OBJ mbx_fd; > SaAisErrorT error = SA_AIS_OK; > + AVD_STBY_SYNC_STATE old_sync_state = cb->stby_sync_state; > int polltmo = -1; > int term_fd; > > @@ -703,6 +704,7 @@ static void main_loop(void) > } > > if (fds[FD_MBCSV].revents & POLLIN) { > + old_sync_state = cb->stby_sync_state; > if (NCSCC_RC_SUCCESS != avsv_mbcsv_dispatch(cb, > SA_DISPATCH_ALL)) > LOG_ER("avsv_mbcsv_dispatch FAILED"); > } > @@ -750,7 +752,13 @@ static void main_loop(void) > avd_d2n_msg_dequeue(cb); > } > } > - > + > + // Standby COLD_SYNC completed. Ask STANDBY to flush its job > queue if size is 0 on active. > + if ((Fifo::size() == 0) && (old_sync_state == > AVD_STBY_OUT_OF_SYNC) > + && (cb->stby_sync_state == AVD_STBY_IN_SYNC)) { > + ckpt_job_queue_size(); > + old_sync_state = cb->stby_sync_state; > + } > // submit some jobs (if any) > polltmo = retval_to_polltmo(Fifo::execute(cb)); > } ------------------------------------------------------------------------------ Check out the vibrant tech community on one of the world's most engaging tech sites, SlashDot.org! http://sdm.link/slashdot _______________________________________________ Opensaf-devel mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/opensaf-devel
