Hi Mahesh, Handling of checkpoint for stream close is in several places. Maybe we could refactor to handle in one place to make clean code. But it is not related to this ticket. We could refactor it in later ?
Regards Canh -----Original Message----- From: Canh Van Truong [mailto:[email protected]] Sent: Thursday, April 27, 2017 1:27 PM To: 'A V Mahesh' <[email protected]>; '[email protected]' <[email protected]>; '[email protected]' <[email protected]> Cc: '[email protected]' <[email protected]> Subject: RE: [PATCH 1/1] log: fix checkpoint dest_names in open stream request [#2434] Hi Mahesh, As current design, the configuration stream is just deleted (e.g. immcfg -d <STREAM_DN>) if the stream is opened only one time at creating cfg stream (stream->numOpeners = 1). You can check at "stream_ccb_completed_delete" function If stream->numOpeners > 1, the activity deleting configuration stream is failed. It means that the configuration stream is just deleted when no client is associated with this stream. So the client_id is assigned -1 when checkpoint at stream close in deleting configuration stream. An invalid client_id is updated here for checkpoint data. In "proc_stream_close_msg", the valid client_id was updated for checkpoint data when check-pointing. Thanks Canh -----Original Message----- From: A V Mahesh [mailto:[email protected]] Sent: Thursday, April 27, 2017 10:35 AM To: Canh Van Truong <[email protected]>; [email protected]; [email protected] Cc: [email protected] Subject: Re: [PATCH 1/1] log: fix checkpoint dest_names in open stream request [#2434] Hi Canh, On 4/27/2017 12:34 PM, Canh Van Truong wrote: > For checkpoint stream open in proc_stream_open_msg, lgs need update a > valid client_id for checkpoint data I was asking, is For checkpoint stream close in proc_stream_close_msg, lgs need update a valid client_id for checkpoint data ? in `ckpt_stream_close` client_id is checkpoint as -1 now. irrelevant of clientId used or not for stream_close , change this `ckpt_stream_close(log_stream_t *stream, time_t closetime)` function also to ckpt_stream_close(log_stream_t *stream, time_t closetime, const uint32_t &client_id); ==================================================================================== static uint32_t ckpt_stream_close(log_stream_t *stream, time_t closetime) { uint32_t rc; lgsv_ckpt_msg_v1_t ckpt_v1; lgsv_ckpt_msg_v2_t ckpt_v2; void *ckpt_ptr; TRACE_ENTER(); if (lgs_is_peer_v2()) { memset(&ckpt_v2, 0, sizeof(ckpt_v2)); ckpt_v2.header.ckpt_rec_type = LGS_CKPT_CLOSE_STREAM; ckpt_v2.header.num_ckpt_records = 1; ckpt_v2.header.data_len = 1; /* No client. Logservice itself has opened stream */ ckpt_v2.ckpt_rec.stream_close.clientId = -1; ==================================================================================== -AVM On 4/27/2017 12:34 PM, Canh Van Truong wrote: > > Hi Mahesh, > > For checkpoint stream open in proc_stream_open_msg, lgs need update a > valid client_id for checkpoint data > > For checkpoint stream open in create configuration stream (in > stream_ccb_apply_create) and in encode the cold sync > (edu_enc_streams), lgs update an invalid client_id (-1) for checkpoint > data. Because the client_id is not needed here for creating > configuration stream when no client is associated with the stream. The > cold sync is the same reason. > > I am not sure if I understand your question? Could you clear it to me? > > Thanks > > Canh > > -----Original Message----- > From: A V Mahesh [mailto:[email protected]] > Sent: Thursday, April 27, 2017 8:20 AM > To: Canh Van Truong <[email protected]>; > [email protected]; [email protected] > Cc: [email protected] > Subject: Re: [PATCH 1/1] log: fix checkpoint dest_names in open stream > request [#2434] > > Hi Canh Van, > > As if you are updating client_id while lgs_ckpt_stream_open() checkpoint > > is client_id not required as reference to while ckpt_stream_close() > > checkpoint ? > > currently this assigned to -1. > > -AVM > > On 4/25/2017 6:58 PM, Canh Van Truong wrote: > > > Handling of checkpoint for stream open is in serveral places. > Handling of checkpoint > > > is called in proc_stream_open_msg forgot to add checkpoint of > destination name. > > > > > > Refactor so that handling of checkpoint data for stream open is done > in one place > > > and destination name was already added in checkpoint data at this place > > > --- > > > src/log/logd/lgs_evt.cc | 69 > ++-------------------------------------------- > > > src/log/logd/lgs_imm.cc | 40 ++------------------------- > > > src/log/logd/lgs_mbcsv.cc | 15 +++++----- > > > src/log/logd/lgs_mbcsv.h | 5 ++-- > > > src/log/logd/lgs_stream.cc | 41 ++++++++++++++++++++++++++- > > > src/log/logd/lgs_stream.h | 2 ++ > > > 6 files changed, 57 insertions(+), 115 deletions(-) > > > > > > diff --git a/src/log/logd/lgs_evt.cc b/src/log/logd/lgs_evt.cc > > > index 6972efe55..beb46e7a7 100644 > > > --- a/src/log/logd/lgs_evt.cc > > > +++ b/src/log/logd/lgs_evt.cc > > > @@ -821,72 +821,6 @@ snd_rsp: > > > } > > > > > > /** > > > - * Stream open checkpointing > > > - * @param cb > > > - * @param logStream > > > - * @param open_sync_param > > > - * @return > > > - */ > > > -static uint32_t lgs_ckpt_stream_open(lgs_cb_t *cb, log_stream_t > *logStream, > > > - lgsv_stream_open_req_t > *open_sync_param) { > > > - uint32_t async_rc = NCSCC_RC_SUCCESS; > > > - lgsv_ckpt_msg_v1_t ckpt_v1; > > > - lgsv_ckpt_msg_v2_t ckpt_v2; > > > - void *ckpt_ptr; > > > - lgsv_ckpt_header_t *header_ptr; > > > - lgs_ckpt_stream_open_t *ckpt_rec_open_ptr; > > > - > > > - TRACE_ENTER(); > > > - > > > - if (lgs_is_peer_v2()) { > > > - memset(&ckpt_v2, 0, sizeof(ckpt_v2)); > > > - header_ptr = &ckpt_v2.header; > > > - ckpt_rec_open_ptr = &ckpt_v2.ckpt_rec.stream_open; > > > - ckpt_ptr = &ckpt_v2; > > > - } else { > > > - memset(&ckpt_v1, 0, sizeof(ckpt_v1)); > > > - header_ptr = &ckpt_v1.header; > > > - ckpt_rec_open_ptr = &ckpt_v1.ckpt_rec.stream_open; > > > - ckpt_ptr = &ckpt_v1; > > > - } > > > - > > > - if (cb->ha_state == SA_AMF_HA_ACTIVE) { > > > - header_ptr->ckpt_rec_type = LGS_CKPT_OPEN_STREAM; > > > - header_ptr->num_ckpt_records = 1; > > > - header_ptr->data_len = 1; > > > - ckpt_rec_open_ptr->clientId = open_sync_param->client_id; > > > - ckpt_rec_open_ptr->streamId = logStream->streamId; > > > - > > > - ckpt_rec_open_ptr->logFile = > > > - const_cast<char *>(logStream->fileName.c_str()); > > > - ckpt_rec_open_ptr->logPath = > > > - const_cast<char *>(logStream->pathName.c_str()); > > > - ckpt_rec_open_ptr->logFileCurrent = > > > - const_cast<char *>(logStream->logFileCurrent.c_str()); > > > - ckpt_rec_open_ptr->fileFmt = logStream->logFileFormat; > > > - ckpt_rec_open_ptr->logStreamName = > > > - const_cast<char *>(logStream->name.c_str()); > > > - > > > - ckpt_rec_open_ptr->maxFileSize = logStream->maxLogFileSize; > > > - ckpt_rec_open_ptr->maxLogRecordSize = logStream->fixedLogRecordSize; > > > - ckpt_rec_open_ptr->logFileFullAction = logStream->logFullAction; > > > - ckpt_rec_open_ptr->maxFilesRotated = logStream->maxFilesRotated; > > > - ckpt_rec_open_ptr->creationTimeStamp = logStream->creationTimeStamp; > > > - ckpt_rec_open_ptr->numOpeners = logStream->numOpeners; > > > - > > > - ckpt_rec_open_ptr->streamType = logStream->streamType; > > > - ckpt_rec_open_ptr->logRecordId = logStream->logRecordId; > > > - > > > - async_rc = lgs_ckpt_send_async(cb, ckpt_ptr, NCS_MBCSV_ACT_ADD); > > > - if (async_rc == NCSCC_RC_SUCCESS) { > > > - TRACE_4("REG_REC ASYNC UPDATE SEND SUCCESS..."); > > > - } > > > - } > > > - TRACE_LEAVE2("async_rc = %d", async_rc); > > > - return async_rc; > > > -} > > > - > > > -/** > > > * Create a new application stream > > > * > > > * @param open_sync_param[in] Parameters used to create the stream > > > @@ -1226,8 +1160,9 @@ snd_rsp: > > > rc = lgs_mds_msg_send(cb, &msg, &evt->fr_dest, &evt->mds_ctxt, > > > MDS_SEND_PRIORITY_HIGH); > > > > > > + // Checkpoint the opened stream > > > if (ais_rv == SA_AIS_OK) { > > > - (void)lgs_ckpt_stream_open(cb, logStream, open_sync_param); > > > + (void)lgs_ckpt_stream_open(logStream, open_sync_param->client_id); > > > } > > > > > > // These memories are allocated in MDS log open decode callback. > > > diff --git a/src/log/logd/lgs_imm.cc b/src/log/logd/lgs_imm.cc > > > index caf0cc92a..2eb3b7544 100644 > > > --- a/src/log/logd/lgs_imm.cc > > > +++ b/src/log/logd/lgs_imm.cc > > > @@ -318,43 +318,6 @@ static uint32_t ckpt_stream_config(log_stream_t > *stream) { > > > } > > > > > > /** > > > - * Pack and send an open stream checkpoint using mbcsv > > > - * @param stream > > > - * @return uint32 > > > - */ > > > -static uint32_t ckpt_stream_open(log_stream_t *stream) { > > > - uint32_t rc; > > > - lgsv_ckpt_msg_v1_t ckpt_v1; > > > - lgsv_ckpt_msg_v2_t ckpt_v2; > > > - void *ckpt_ptr; > > > - lgs_ckpt_stream_open_t *stream_open_ptr; > > > - lgsv_ckpt_header_t *header_ptr; > > > - > > > - TRACE_ENTER(); > > > - > > > - if (lgs_is_peer_v2()) { > > > - memset(&ckpt_v2, 0, sizeof(ckpt_v2)); > > > - header_ptr = &ckpt_v2.header; > > > - stream_open_ptr = &ckpt_v2.ckpt_rec.stream_open; > > > - ckpt_ptr = &ckpt_v2; > > > - } else { > > > - memset(&ckpt_v1, 0, sizeof(ckpt_v1)); > > > - header_ptr = &ckpt_v1.header; > > > - stream_open_ptr = &ckpt_v1.ckpt_rec.stream_open; > > > - ckpt_ptr = &ckpt_v1; > > > - } > > > - header_ptr->ckpt_rec_type = LGS_CKPT_OPEN_STREAM; > > > - header_ptr->num_ckpt_records = 1; > > > - header_ptr->data_len = 1; > > > - > > > - lgs_ckpt_stream_open_set(stream, stream_open_ptr); > > > - rc = lgs_ckpt_send_async(lgs_cb, ckpt_ptr, NCS_MBCSV_ACT_ADD); > > > - > > > - TRACE_LEAVE(); > > > - return rc; > > > -} > > > - > > > -/** > > > * Pack and send a close stream checkpoint using mbcsv > > > * @param stream > > > * @param recType > > > @@ -2409,7 +2372,8 @@ static void stream_ccb_apply_create(const > CcbUtilOperationData_t *opdata) { > > > > > > if ((rc = stream_create_and_configure1(opdata, &stream)) == > SA_AIS_OK) { > > > log_stream_open_fileinit(stream); > > > - ckpt_stream_open(stream); > > > + // Checkpoint the opened stream with invalid clientId (-1) > > > + lgs_ckpt_stream_open(stream, -1); > > > } else { > > > LOG_IN("Stream create and configure failed %d", rc); > > > } > > > diff --git a/src/log/logd/lgs_mbcsv.cc b/src/log/logd/lgs_mbcsv.cc > > > index 59232e138..b94c8cad2 100644 > > > --- a/src/log/logd/lgs_mbcsv.cc > > > +++ b/src/log/logd/lgs_mbcsv.cc > > > @@ -670,16 +670,18 @@ static uint32_t > ckpt_enc_cold_sync_data(lgs_cb_t *lgs_cb, > > > } /*End ckpt_enc_cold_sync_data() */ > > > > > > /** > > > - * Set parameters for open stream > > > + * Set parameters for check-pointing the opened stream > > > * > > > * @param logStream > > > * @param stream_open > > > + * @param client_id > > > * @return > > > */ > > > -uint32_t lgs_ckpt_stream_open_set(log_stream_t *logStream, > > > - lgs_ckpt_stream_open_t *stream_open) { > > > +void lgs_ckpt_stream_open_set(log_stream_t *logStream, > > > + lgs_ckpt_stream_open_t *stream_open, > > > + const uint32_t &client_id) { > > > memset(stream_open, 0, sizeof(lgs_ckpt_stream_open_t)); > > > - stream_open->clientId = -1; /* not used in this message */ > > > + stream_open->clientId = client_id; > > > stream_open->streamId = logStream->streamId; > > > stream_open->logFile = const_cast<char > *>(logStream->fileName.c_str()); > > > stream_open->logPath = const_cast<char > *>(logStream->pathName.c_str()); > > > @@ -697,8 +699,6 @@ uint32_t lgs_ckpt_stream_open_set(log_stream_t > *logStream, > > > stream_open->numOpeners = logStream->numOpeners; > > > stream_open->streamType = logStream->streamType; > > > stream_open->logRecordId = logStream->logRecordId; > > > - > > > - return NCSCC_RC_SUCCESS; > > > } > > > > > > /** > > > @@ -737,7 +737,8 @@ static uint32_t edu_enc_streams(lgs_cb_t *cb, > NCS_UBAID *uba) { > > > SaBoolT endloop = SA_FALSE, jstart = SA_TRUE; > > > while ((log_stream_rec = iterate_all_streams(endloop, jstart)) > && !endloop) { > > > jstart = SA_FALSE; > > > - lgs_ckpt_stream_open_set(log_stream_rec, ckpt_stream_rec); > > > + // Encode the stream with invalid clientId for cold sync > > > + lgs_ckpt_stream_open_set(log_stream_rec, ckpt_stream_rec, -1); > > > rc = m_NCS_EDU_EXEC(&cb->edu_hdl, edp_ed_open_stream_rec, uba, > > > EDP_OP_TYPE_ENC, ckpt_stream_rec, &ederror); > > > > > > diff --git a/src/log/logd/lgs_mbcsv.h b/src/log/logd/lgs_mbcsv.h > > > index b63b0cc2d..75a96deb6 100644 > > > --- a/src/log/logd/lgs_mbcsv.h > > > +++ b/src/log/logd/lgs_mbcsv.h > > > @@ -113,8 +113,9 @@ bool lgs_is_peer_v6(); > > > bool lgs_is_split_file_system(); > > > uint32_t lgs_mbcsv_dispatch(NCS_MBCSV_HDL mbcsv_hdl); > > > void lgs_free_edu_mem(char *ptr); > > > -uint32_t lgs_ckpt_stream_open_set(log_stream_t *logStream, > > > - lgs_ckpt_stream_open_t *stream_open); > > > +void lgs_ckpt_stream_open_set(log_stream_t *logStream, > > > + lgs_ckpt_stream_open_t *stream_open, > > > + const uint32_t &client_id); > > > uint32_t edp_ed_header_rec(EDU_HDL *edu_hdl, EDU_TKN *edu_tkn, > NCSCONTEXT ptr, > > > uint32_t *ptr_data_len, EDU_BUF_ENV *buf_env, > > > EDP_OP_TYPE op, EDU_ERR *o_err); > > > diff --git a/src/log/logd/lgs_stream.cc b/src/log/logd/lgs_stream.cc > > > index 69da6c10e..26014c936 100644 > > > --- a/src/log/logd/lgs_stream.cc > > > +++ b/src/log/logd/lgs_stream.cc > > > @@ -31,9 +31,11 @@ > > > #include <algorithm> > > > > > > #include "log/logd/lgs.h" > > > -#include "lgs_config.h" > > > +#include "log/logd/lgs_config.h" > > > #include "log/logd/lgs_file.h" > > > #include "log/logd/lgs_filehdl.h" > > > +#include "log/logd/lgs_mbcsv_v1.h" > > > +#include "log/logd/lgs_mbcsv_v2.h" > > > #include "base/osaf_time.h" > > > #include "osaf/immutil/immutil.h" > > > > > > @@ -1597,3 +1599,40 @@ void log_stream_form_dest_names(log_stream_t > *stream) { > > > } > > > stream->stb_dest_names = output; > > > } > > > + > > > +/** > > > +* Check-pointing the opened stream > > > +* @param stream > > > +* @param client_id > > > +* @return > > > +*/ > > > +void lgs_ckpt_stream_open(log_stream_t *stream, const uint32_t > &client_id) { > > > + uint32_t rc; > > > + lgsv_ckpt_msg_v1_t ckpt_v1; > > > + lgsv_ckpt_msg_v2_t ckpt_v2; > > > + void *ckpt_ptr; > > > + lgs_ckpt_stream_open_t *stream_open_ptr; > > > + lgsv_ckpt_header_t *header_ptr; > > > + > > > + TRACE_ENTER(); > > > + > > > + if (lgs_is_peer_v2()) { > > > + memset(&ckpt_v2, 0, sizeof(ckpt_v2)); > > > + header_ptr = &ckpt_v2.header; > > > + stream_open_ptr = &ckpt_v2.ckpt_rec.stream_open; > > > + ckpt_ptr = &ckpt_v2; > > > + } else { > > > + memset(&ckpt_v1, 0, sizeof(ckpt_v1)); > > > + header_ptr = &ckpt_v1.header; > > > + stream_open_ptr = &ckpt_v1.ckpt_rec.stream_open; > > > + ckpt_ptr = &ckpt_v1; > > > + } > > > + header_ptr->ckpt_rec_type = LGS_CKPT_OPEN_STREAM; > > > + header_ptr->num_ckpt_records = 1; > > > + header_ptr->data_len = 1; > > > + > > > + lgs_ckpt_stream_open_set(stream, stream_open_ptr, client_id); > > > + rc = lgs_ckpt_send_async(lgs_cb, ckpt_ptr, NCS_MBCSV_ACT_ADD); > > > + > > > + TRACE_LEAVE2("Check-pointing the opened stream: rc=%d", rc); > > > +} > > > diff --git a/src/log/logd/lgs_stream.h b/src/log/logd/lgs_stream.h > > > index 1ed4b1571..0ef5b7a11 100644 > > > --- a/src/log/logd/lgs_stream.h > > > +++ b/src/log/logd/lgs_stream.h > > > @@ -143,4 +143,6 @@ void log_stream_delete_dest_name(log_stream_t > *stream, > > > const std::vector<std::string> &names); > > > void log_stream_form_dest_names(log_stream_t *stream); > > > > > > +void lgs_ckpt_stream_open(log_stream_t *stream, const uint32_t > &client_id); > > > + > > > #endif // LOG_LOGD_LGS_STREAM_H_ > ------------------------------------------------------------------------------ Check out the vibrant tech community on one of the world's most engaging tech sites, Slashdot.org! http://sdm.link/slashdot _______________________________________________ Opensaf-devel mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/opensaf-devel
