ack, code review and some "smoke tests" run. Minor comment below.

/Thanks HansN


On 01/30/2017 12:53 PM, [email protected] wrote:
>   src/amf/amfd/chkop.cc    |   8 +++++++-
>   src/amf/amfd/ckpt.h      |   3 ++-
>   src/amf/amfd/ckpt_dec.cc |  15 ++++++++++++++-
>   src/amf/amfd/ckpt_enc.cc |  13 ++++++++++++-
>   src/amf/amfd/ckpt_msg.h  |   1 +
>   src/amf/amfd/imm.cc      |  31 ++++++++++---------------------
>   src/amf/amfd/imm.h       |   1 +
>   src/amf/amfd/main.cc     |  10 +++++++++-
>   8 files changed, 56 insertions(+), 26 deletions(-)
>
>
> Standby AMFD keeps 500 jobs related to IMM updates in its queue. Whenever 
> this size
> limit increases, job queue is trimmed to 250.
>
> Patch removes this limitation. Now active AMFD will ask standby AMFD through 
> MBCSv
> checkpointing to flush its job queue when a) active AMFD finishes all IMM 
> runtime updates
> and its job queue becomes empty and b) when COLD sync state of standy changes 
> from
> OUT_OF_SYNC to IN_SYNC when standby joins cluster first time.
>
> diff --git a/src/amf/amfd/chkop.cc b/src/amf/amfd/chkop.cc
> --- a/src/amf/amfd/chkop.cc
> +++ b/src/amf/amfd/chkop.cc
> @@ -1051,7 +1051,13 @@ uint32_t avsv_send_ckpt_data(AVD_CL_CB *
>               }
>               cb->async_updt_cnt.ng_updt++;
>               break;
> -
> +     case AVSV_CKPT_AVD_IMM_JOB_QUEUE_STATUS:
> +             if ((avd_cb->other_avd_adest != 0) && (avd_cb->avd_peer_ver < 
> AVD_MBCSV_SUB_PART_VERSION_8)) {
> +                     TRACE("No ckpt for  AVSV_CKPT_AVD_IMM_JOB_QUEUE_STATUS 
> as peer AMFD has"
> +                                     " lower version:%d", 
> avd_cb->avd_peer_ver);
> +                     return NCSCC_RC_SUCCESS;
> +             }
> +             break;
>       default:
>               return NCSCC_RC_SUCCESS;
>       }
> diff --git a/src/amf/amfd/ckpt.h b/src/amf/amfd/ckpt.h
> --- a/src/amf/amfd/ckpt.h
> +++ b/src/amf/amfd/ckpt.h
> @@ -34,9 +34,10 @@
>   #define AMF_AMFD_CKPT_H_
>   
>   // current version
> -#define AVD_MBCSV_SUB_PART_VERSION      7
> +#define AVD_MBCSV_SUB_PART_VERSION      8
>   
>   // supported versions
> +#define AVD_MBCSV_SUB_PART_VERSION_8    8
>   #define AVD_MBCSV_SUB_PART_VERSION_7    7
>   #define AVD_MBCSV_SUB_PART_VERSION_6    6
>   #define AVD_MBCSV_SUB_PART_VERSION_5    5
> diff --git a/src/amf/amfd/ckpt_dec.cc b/src/amf/amfd/ckpt_dec.cc
> --- a/src/amf/amfd/ckpt_dec.cc
> +++ b/src/amf/amfd/ckpt_dec.cc
> @@ -100,6 +100,7 @@ static uint32_t dec_cs_siass(AVD_CL_CB *
>   static uint32_t dec_cs_si_trans(AVD_CL_CB *cb, NCS_MBCSV_CB_DEC *dec, 
> uint32_t num_of_obj);
>   static uint32_t dec_cs_async_updt_cnt(AVD_CL_CB *cb, NCS_MBCSV_CB_DEC *dec, 
> uint32_t num_of_obj);
>   static uint32_t dec_cs_comp_cs_type_config(AVD_CL_CB *cb, NCS_MBCSV_CB_DEC 
> *dec, uint32_t num_of_obj);
> +static uint32_t dec_avd_to_avd_job_queue_status(AVD_CL_CB *cb, 
> NCS_MBCSV_CB_DEC *dec);
>   
>   /*
>    * Function list for decoding the async data.
> @@ -175,7 +176,9 @@ const AVSV_DECODE_CKPT_DATA_FUNC_PTR avd
>       nullptr,                        /* AVSV_SYNC_COMMIT */
>       dec_su_restart_count,
>       dec_si_dep_state,
> -     dec_ng_admin_state
> +     dec_ng_admin_state,
> +     dec_avd_to_avd_job_queue_status
> +
>   };
>   
>   /*
> @@ -2972,3 +2975,13 @@ static uint32_t dec_ng_admin_state(AVD_C
>       return NCSCC_RC_SUCCESS;
>   }
>   
> +static uint32_t dec_avd_to_avd_job_queue_status(AVD_CL_CB *cb, 
> NCS_MBCSV_CB_DEC *dec) {
> +  TRACE_ENTER();
> +  uint32_t size = 1;
> +  osaf_decode_uint32(&dec->i_uba, &size);
> +  Fifo::trim_to_size(size);
> +  TRACE_LEAVE();
> +  return NCSCC_RC_SUCCESS;
> +}
> +
> +
> diff --git a/src/amf/amfd/ckpt_enc.cc b/src/amf/amfd/ckpt_enc.cc
> --- a/src/amf/amfd/ckpt_enc.cc
> +++ b/src/amf/amfd/ckpt_enc.cc
> @@ -99,6 +99,7 @@ static uint32_t enc_cs_siass(AVD_CL_CB *
>   static uint32_t enc_cs_si_trans(AVD_CL_CB *cb, NCS_MBCSV_CB_ENC *enc, 
> uint32_t *num_of_obj);
>   static uint32_t enc_cs_async_updt_cnt(AVD_CL_CB *cb, NCS_MBCSV_CB_ENC *enc, 
> uint32_t *num_of_obj);
>   static uint32_t enc_cs_comp_cs_type_config(AVD_CL_CB *cb, NCS_MBCSV_CB_ENC 
> *enc, uint32_t *num_of_obj);
> +static uint32_t enc_avd_to_avd_job_queue_status(AVD_CL_CB *cb, 
> NCS_MBCSV_CB_ENC *enc);
>   
>   static uint32_t enc_su_oper_list(AVD_CL_CB *cb, AVD_SG *sg, 
> NCS_MBCSV_CB_ENC *enc);
>   /*
> @@ -175,7 +176,8 @@ const AVSV_ENCODE_CKPT_DATA_FUNC_PTR avd
>       nullptr,                        /* AVSV_SYNC_COMMIT */
>       enc_su_restart_count,
>       enc_si_dep_state,
> -     enc_ng_admin_state
> +     enc_ng_admin_state,
> +     enc_avd_to_avd_job_queue_status
>   };
>   
>   /*
> @@ -2493,4 +2495,13 @@ static uint32_t enc_ng_admin_state(AVD_C
>           return NCSCC_RC_SUCCESS;
>   }
>   
> +static uint32_t enc_avd_to_avd_job_queue_status(AVD_CL_CB *cb, 
> NCS_MBCSV_CB_ENC *enc) {
> +  TRACE_ENTER();
> +  osafassert(NCS_MBCSV_ACT_UPDATE == enc->io_action);
> +  const uint32_t *size = reinterpret_cast<uint32_t*>(enc->io_reo_hdl);
> +  osaf_encode_uint32(&enc->io_uba, *size);
> +  TRACE_LEAVE();
> +  return NCSCC_RC_SUCCESS;
> +}
>   
> +
> diff --git a/src/amf/amfd/ckpt_msg.h b/src/amf/amfd/ckpt_msg.h
> --- a/src/amf/amfd/ckpt_msg.h
> +++ b/src/amf/amfd/ckpt_msg.h
> @@ -111,6 +111,7 @@ typedef enum avsv_ckpt_msg_reo_type {
>       AVSV_CKPT_SU_RESTART_COUNT,
>       AVSV_CKPT_SI_DEP_STATE,
>       AVSV_CKPT_NG_ADMIN_STATE,
> +     AVSV_CKPT_AVD_IMM_JOB_QUEUE_STATUS,
>       AVSV_CKPT_MSG_MAX
>   } AVSV_CKPT_MSG_REO_TYPE;
>   
> diff --git a/src/amf/amfd/imm.cc b/src/amf/amfd/imm.cc
> --- a/src/amf/amfd/imm.cc
> +++ b/src/amf/amfd/imm.cc
> @@ -126,7 +126,6 @@ static char *StrDup(const char *s)
>       std::strcpy(c,s);
>       return c;
>   }
> -uint32_t const MAX_JOB_SIZE_AT_STANDBY = 500;
>   
>   //
>   Job::~Job()
> @@ -381,24 +380,11 @@ Job* Fifo::dequeue()
>       
>       return tmp;
>   }
> -
> -/**
> - * @brief   As of now standby AMFD will maintain immjobs for object of few 
> classes.
> - *          Flush all the jobs without updating to imm if 
> MAX_JOB_SIZE_AT_STANDBY is
> - *       reached.
> - *
> - */
> -void check_and_flush_job_queue_standby_amfd(void)
> -{
> -     TRACE_ENTER();
> -
> -     if (Fifo::size() >= MAX_JOB_SIZE_AT_STANDBY) {
> -             const uint32_t new_size = MAX_JOB_SIZE_AT_STANDBY / 2;
> -             LOG_WA("Reducing job queue of size:%u to 
> %u",Fifo::size(),new_size);
> -             Fifo::trim_to_size(new_size);
> -     }
> -
> -     TRACE_LEAVE();
> +void ckpt_job_queue_size() {
> +  uint32_t size = Fifo::size();
[HansN] space missing befor size
> +  TRACE_ENTER2("Fifo size:%u",size);
> +  m_AVSV_SEND_CKPT_UPDT_ASYNC_UPDT(avd_cb, &size, 
> AVSV_CKPT_AVD_IMM_JOB_QUEUE_STATUS);
> +  TRACE_LEAVE();
>   }
>   
>   //
> @@ -412,7 +398,6 @@ AvdJobDequeueResultT Fifo::execute(const
>               return JOB_ETRYAGAIN;
>   
>       if ((!avd_cb->is_implementer) && (avd_cb->avail_state_avd == 
> SA_AMF_HA_STANDBY)) {
> -             check_and_flush_job_queue_standby_amfd();
>               return JOB_EINVH;
>       }
>   
> @@ -426,6 +411,10 @@ AvdJobDequeueResultT Fifo::execute(const
>       TRACE_ENTER();
>   
>       ret = ajob->exec(cb);
> +
> +     //If no jobs then send a ckpt update to standby to flush its job queue.
> +     if (((ajob = peek()) == nullptr) && (cb->stby_sync_state == 
> AVD_STBY_IN_SYNC))
> +             ckpt_job_queue_size();
>       
>       TRACE_LEAVE2("%d", ret);
>   
> @@ -477,7 +466,7 @@ void Fifo::trim_to_size(const uint32_t s
>   {
>       Job *ajob;
>   
> -     TRACE_ENTER();
> +     TRACE_ENTER2("My Fifo size:%lu, trim to size:%u", job_.size(), size);
>   
>       while (job_.size() > size && (ajob = dequeue()) != nullptr) {
>               delete ajob;
> diff --git a/src/amf/amfd/imm.h b/src/amf/amfd/imm.h
> --- a/src/amf/amfd/imm.h
> +++ b/src/amf/amfd/imm.h
> @@ -208,5 +208,6 @@ void report_admin_op_error(SaImmOiHandle
>               struct admin_oper_cbk *pend_cbk,
>               const char *format, ...) __attribute__ ((format(printf, 5, 6)));
>   extern void check_and_flush_job_queue_standby_amfd(void);
> +void ckpt_job_queue_size();
>   
>   #endif  // AMF_AMFD_IMM_H_
> diff --git a/src/amf/amfd/main.cc b/src/amf/amfd/main.cc
> --- a/src/amf/amfd/main.cc
> +++ b/src/amf/amfd/main.cc
> @@ -621,6 +621,7 @@ static void main_loop(void)
>       AVD_EVT *evt;
>       NCS_SEL_OBJ mbx_fd;
>       SaAisErrorT error = SA_AIS_OK;
> +     AVD_STBY_SYNC_STATE old_sync_state = cb->stby_sync_state;
>       int polltmo = -1;
>       int term_fd;
>   
> @@ -703,6 +704,7 @@ static void main_loop(void)
>               }
>   
>               if (fds[FD_MBCSV].revents & POLLIN) {
> +                     old_sync_state = cb->stby_sync_state;
>                       if (NCSCC_RC_SUCCESS != avsv_mbcsv_dispatch(cb, 
> SA_DISPATCH_ALL))
>                               LOG_ER("avsv_mbcsv_dispatch FAILED");
>               }
> @@ -750,7 +752,13 @@ static void main_loop(void)
>                               avd_d2n_msg_dequeue(cb);
>                       }
>               }
> -
> +             
> +             // Standby COLD_SYNC completed. Ask STANDBY to flush its job 
> queue if size is 0 on active.
> +             if ((Fifo::size() == 0) && (old_sync_state == 
> AVD_STBY_OUT_OF_SYNC)
> +                     && (cb->stby_sync_state == AVD_STBY_IN_SYNC)) {
> +                     ckpt_job_queue_size();
> +                     old_sync_state = cb->stby_sync_state;
> +             }
>               // submit some jobs (if any)
>               polltmo = retval_to_polltmo(Fifo::execute(cb));
>       }


------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, SlashDot.org! http://sdm.link/slashdot
_______________________________________________
Opensaf-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to