Hi Canh,
On 4/27/2017 12:34 PM, Canh Van Truong wrote:
> For checkpoint stream open in proc_stream_open_msg, lgs need update a
> valid client_id for checkpoint data
I was asking, is For checkpoint stream close in proc_stream_close_msg,
lgs need update a valid client_id for checkpoint data ?
in `ckpt_stream_close` client_id is checkpoint as -1 now.
irrelevant of clientId used or not for stream_close , change this
`ckpt_stream_close(log_stream_t *stream, time_t closetime)`
function also to ckpt_stream_close(log_stream_t *stream, time_t
closetime, const uint32_t &client_id);
====================================================================================
static uint32_t ckpt_stream_close(log_stream_t *stream, time_t closetime) {
uint32_t rc;
lgsv_ckpt_msg_v1_t ckpt_v1;
lgsv_ckpt_msg_v2_t ckpt_v2;
void *ckpt_ptr;
TRACE_ENTER();
if (lgs_is_peer_v2()) {
memset(&ckpt_v2, 0, sizeof(ckpt_v2));
ckpt_v2.header.ckpt_rec_type = LGS_CKPT_CLOSE_STREAM;
ckpt_v2.header.num_ckpt_records = 1;
ckpt_v2.header.data_len = 1;
/* No client. Logservice itself has opened stream */
ckpt_v2.ckpt_rec.stream_close.clientId = -1;
====================================================================================
-AVM
On 4/27/2017 12:34 PM, Canh Van Truong wrote:
>
> Hi Mahesh,
>
> For checkpoint stream open in proc_stream_open_msg, lgs need update a
> valid client_id for checkpoint data
>
> For checkpoint stream open in create configuration stream (in
> stream_ccb_apply_create) and in encode the cold sync
> (edu_enc_streams), lgs update an invalid client_id (-1) for checkpoint
> data. Because the client_id is not needed here for creating
> configuration stream when no client is associated with the stream. The
> cold sync is the same reason.
>
> I am not sure if I understand your question? Could you clear it to me?
>
> Thanks
>
> Canh
>
> -----Original Message-----
> From: A V Mahesh [mailto:[email protected]]
> Sent: Thursday, April 27, 2017 8:20 AM
> To: Canh Van Truong <[email protected]>;
> [email protected]; [email protected]
> Cc: [email protected]
> Subject: Re: [PATCH 1/1] log: fix checkpoint dest_names in open stream
> request [#2434]
>
> Hi Canh Van,
>
> As if you are updating client_id while lgs_ckpt_stream_open() checkpoint
>
> is client_id not required as reference to while ckpt_stream_close()
>
> checkpoint ?
>
> currently this assigned to -1.
>
> -AVM
>
> On 4/25/2017 6:58 PM, Canh Van Truong wrote:
>
> > Handling of checkpoint for stream open is in serveral places.
> Handling of checkpoint
>
> > is called in proc_stream_open_msg forgot to add checkpoint of
> destination name.
>
> >
>
> > Refactor so that handling of checkpoint data for stream open is done
> in one place
>
> > and destination name was already added in checkpoint data at this place
>
> > ---
>
> > src/log/logd/lgs_evt.cc | 69
> ++--------------------------------------------
>
> > src/log/logd/lgs_imm.cc | 40 ++-------------------------
>
> > src/log/logd/lgs_mbcsv.cc | 15 +++++-----
>
> > src/log/logd/lgs_mbcsv.h | 5 ++--
>
> > src/log/logd/lgs_stream.cc | 41 ++++++++++++++++++++++++++-
>
> > src/log/logd/lgs_stream.h | 2 ++
>
> > 6 files changed, 57 insertions(+), 115 deletions(-)
>
> >
>
> > diff --git a/src/log/logd/lgs_evt.cc b/src/log/logd/lgs_evt.cc
>
> > index 6972efe55..beb46e7a7 100644
>
> > --- a/src/log/logd/lgs_evt.cc
>
> > +++ b/src/log/logd/lgs_evt.cc
>
> > @@ -821,72 +821,6 @@ snd_rsp:
>
> > }
>
> >
>
> > /**
>
> > - * Stream open checkpointing
>
> > - * @param cb
>
> > - * @param logStream
>
> > - * @param open_sync_param
>
> > - * @return
>
> > - */
>
> > -static uint32_t lgs_ckpt_stream_open(lgs_cb_t *cb, log_stream_t
> *logStream,
>
> > - lgsv_stream_open_req_t
> *open_sync_param) {
>
> > - uint32_t async_rc = NCSCC_RC_SUCCESS;
>
> > - lgsv_ckpt_msg_v1_t ckpt_v1;
>
> > - lgsv_ckpt_msg_v2_t ckpt_v2;
>
> > - void *ckpt_ptr;
>
> > - lgsv_ckpt_header_t *header_ptr;
>
> > - lgs_ckpt_stream_open_t *ckpt_rec_open_ptr;
>
> > -
>
> > - TRACE_ENTER();
>
> > -
>
> > - if (lgs_is_peer_v2()) {
>
> > - memset(&ckpt_v2, 0, sizeof(ckpt_v2));
>
> > - header_ptr = &ckpt_v2.header;
>
> > - ckpt_rec_open_ptr = &ckpt_v2.ckpt_rec.stream_open;
>
> > - ckpt_ptr = &ckpt_v2;
>
> > - } else {
>
> > - memset(&ckpt_v1, 0, sizeof(ckpt_v1));
>
> > - header_ptr = &ckpt_v1.header;
>
> > - ckpt_rec_open_ptr = &ckpt_v1.ckpt_rec.stream_open;
>
> > - ckpt_ptr = &ckpt_v1;
>
> > - }
>
> > -
>
> > - if (cb->ha_state == SA_AMF_HA_ACTIVE) {
>
> > - header_ptr->ckpt_rec_type = LGS_CKPT_OPEN_STREAM;
>
> > - header_ptr->num_ckpt_records = 1;
>
> > - header_ptr->data_len = 1;
>
> > - ckpt_rec_open_ptr->clientId = open_sync_param->client_id;
>
> > - ckpt_rec_open_ptr->streamId = logStream->streamId;
>
> > -
>
> > - ckpt_rec_open_ptr->logFile =
>
> > - const_cast<char *>(logStream->fileName.c_str());
>
> > - ckpt_rec_open_ptr->logPath =
>
> > - const_cast<char *>(logStream->pathName.c_str());
>
> > - ckpt_rec_open_ptr->logFileCurrent =
>
> > - const_cast<char *>(logStream->logFileCurrent.c_str());
>
> > - ckpt_rec_open_ptr->fileFmt = logStream->logFileFormat;
>
> > - ckpt_rec_open_ptr->logStreamName =
>
> > - const_cast<char *>(logStream->name.c_str());
>
> > -
>
> > - ckpt_rec_open_ptr->maxFileSize = logStream->maxLogFileSize;
>
> > - ckpt_rec_open_ptr->maxLogRecordSize = logStream->fixedLogRecordSize;
>
> > - ckpt_rec_open_ptr->logFileFullAction = logStream->logFullAction;
>
> > - ckpt_rec_open_ptr->maxFilesRotated = logStream->maxFilesRotated;
>
> > - ckpt_rec_open_ptr->creationTimeStamp = logStream->creationTimeStamp;
>
> > - ckpt_rec_open_ptr->numOpeners = logStream->numOpeners;
>
> > -
>
> > - ckpt_rec_open_ptr->streamType = logStream->streamType;
>
> > - ckpt_rec_open_ptr->logRecordId = logStream->logRecordId;
>
> > -
>
> > - async_rc = lgs_ckpt_send_async(cb, ckpt_ptr, NCS_MBCSV_ACT_ADD);
>
> > - if (async_rc == NCSCC_RC_SUCCESS) {
>
> > - TRACE_4("REG_REC ASYNC UPDATE SEND SUCCESS...");
>
> > - }
>
> > - }
>
> > - TRACE_LEAVE2("async_rc = %d", async_rc);
>
> > - return async_rc;
>
> > -}
>
> > -
>
> > -/**
>
> > * Create a new application stream
>
> > *
>
> > * @param open_sync_param[in] Parameters used to create the stream
>
> > @@ -1226,8 +1160,9 @@ snd_rsp:
>
> > rc = lgs_mds_msg_send(cb, &msg, &evt->fr_dest, &evt->mds_ctxt,
>
> > MDS_SEND_PRIORITY_HIGH);
>
> >
>
> > + // Checkpoint the opened stream
>
> > if (ais_rv == SA_AIS_OK) {
>
> > - (void)lgs_ckpt_stream_open(cb, logStream, open_sync_param);
>
> > + (void)lgs_ckpt_stream_open(logStream, open_sync_param->client_id);
>
> > }
>
> >
>
> > // These memories are allocated in MDS log open decode callback.
>
> > diff --git a/src/log/logd/lgs_imm.cc b/src/log/logd/lgs_imm.cc
>
> > index caf0cc92a..2eb3b7544 100644
>
> > --- a/src/log/logd/lgs_imm.cc
>
> > +++ b/src/log/logd/lgs_imm.cc
>
> > @@ -318,43 +318,6 @@ static uint32_t ckpt_stream_config(log_stream_t
> *stream) {
>
> > }
>
> >
>
> > /**
>
> > - * Pack and send an open stream checkpoint using mbcsv
>
> > - * @param stream
>
> > - * @return uint32
>
> > - */
>
> > -static uint32_t ckpt_stream_open(log_stream_t *stream) {
>
> > - uint32_t rc;
>
> > - lgsv_ckpt_msg_v1_t ckpt_v1;
>
> > - lgsv_ckpt_msg_v2_t ckpt_v2;
>
> > - void *ckpt_ptr;
>
> > - lgs_ckpt_stream_open_t *stream_open_ptr;
>
> > - lgsv_ckpt_header_t *header_ptr;
>
> > -
>
> > - TRACE_ENTER();
>
> > -
>
> > - if (lgs_is_peer_v2()) {
>
> > - memset(&ckpt_v2, 0, sizeof(ckpt_v2));
>
> > - header_ptr = &ckpt_v2.header;
>
> > - stream_open_ptr = &ckpt_v2.ckpt_rec.stream_open;
>
> > - ckpt_ptr = &ckpt_v2;
>
> > - } else {
>
> > - memset(&ckpt_v1, 0, sizeof(ckpt_v1));
>
> > - header_ptr = &ckpt_v1.header;
>
> > - stream_open_ptr = &ckpt_v1.ckpt_rec.stream_open;
>
> > - ckpt_ptr = &ckpt_v1;
>
> > - }
>
> > - header_ptr->ckpt_rec_type = LGS_CKPT_OPEN_STREAM;
>
> > - header_ptr->num_ckpt_records = 1;
>
> > - header_ptr->data_len = 1;
>
> > -
>
> > - lgs_ckpt_stream_open_set(stream, stream_open_ptr);
>
> > - rc = lgs_ckpt_send_async(lgs_cb, ckpt_ptr, NCS_MBCSV_ACT_ADD);
>
> > -
>
> > - TRACE_LEAVE();
>
> > - return rc;
>
> > -}
>
> > -
>
> > -/**
>
> > * Pack and send a close stream checkpoint using mbcsv
>
> > * @param stream
>
> > * @param recType
>
> > @@ -2409,7 +2372,8 @@ static void stream_ccb_apply_create(const
> CcbUtilOperationData_t *opdata) {
>
> >
>
> > if ((rc = stream_create_and_configure1(opdata, &stream)) ==
> SA_AIS_OK) {
>
> > log_stream_open_fileinit(stream);
>
> > - ckpt_stream_open(stream);
>
> > + // Checkpoint the opened stream with invalid clientId (-1)
>
> > + lgs_ckpt_stream_open(stream, -1);
>
> > } else {
>
> > LOG_IN("Stream create and configure failed %d", rc);
>
> > }
>
> > diff --git a/src/log/logd/lgs_mbcsv.cc b/src/log/logd/lgs_mbcsv.cc
>
> > index 59232e138..b94c8cad2 100644
>
> > --- a/src/log/logd/lgs_mbcsv.cc
>
> > +++ b/src/log/logd/lgs_mbcsv.cc
>
> > @@ -670,16 +670,18 @@ static uint32_t
> ckpt_enc_cold_sync_data(lgs_cb_t *lgs_cb,
>
> > } /*End ckpt_enc_cold_sync_data() */
>
> >
>
> > /**
>
> > - * Set parameters for open stream
>
> > + * Set parameters for check-pointing the opened stream
>
> > *
>
> > * @param logStream
>
> > * @param stream_open
>
> > + * @param client_id
>
> > * @return
>
> > */
>
> > -uint32_t lgs_ckpt_stream_open_set(log_stream_t *logStream,
>
> > - lgs_ckpt_stream_open_t *stream_open) {
>
> > +void lgs_ckpt_stream_open_set(log_stream_t *logStream,
>
> > + lgs_ckpt_stream_open_t *stream_open,
>
> > + const uint32_t &client_id) {
>
> > memset(stream_open, 0, sizeof(lgs_ckpt_stream_open_t));
>
> > - stream_open->clientId = -1; /* not used in this message */
>
> > + stream_open->clientId = client_id;
>
> > stream_open->streamId = logStream->streamId;
>
> > stream_open->logFile = const_cast<char
> *>(logStream->fileName.c_str());
>
> > stream_open->logPath = const_cast<char
> *>(logStream->pathName.c_str());
>
> > @@ -697,8 +699,6 @@ uint32_t lgs_ckpt_stream_open_set(log_stream_t
> *logStream,
>
> > stream_open->numOpeners = logStream->numOpeners;
>
> > stream_open->streamType = logStream->streamType;
>
> > stream_open->logRecordId = logStream->logRecordId;
>
> > -
>
> > - return NCSCC_RC_SUCCESS;
>
> > }
>
> >
>
> > /**
>
> > @@ -737,7 +737,8 @@ static uint32_t edu_enc_streams(lgs_cb_t *cb,
> NCS_UBAID *uba) {
>
> > SaBoolT endloop = SA_FALSE, jstart = SA_TRUE;
>
> > while ((log_stream_rec = iterate_all_streams(endloop, jstart))
> && !endloop) {
>
> > jstart = SA_FALSE;
>
> > - lgs_ckpt_stream_open_set(log_stream_rec, ckpt_stream_rec);
>
> > + // Encode the stream with invalid clientId for cold sync
>
> > + lgs_ckpt_stream_open_set(log_stream_rec, ckpt_stream_rec, -1);
>
> > rc = m_NCS_EDU_EXEC(&cb->edu_hdl, edp_ed_open_stream_rec, uba,
>
> > EDP_OP_TYPE_ENC, ckpt_stream_rec, &ederror);
>
> >
>
> > diff --git a/src/log/logd/lgs_mbcsv.h b/src/log/logd/lgs_mbcsv.h
>
> > index b63b0cc2d..75a96deb6 100644
>
> > --- a/src/log/logd/lgs_mbcsv.h
>
> > +++ b/src/log/logd/lgs_mbcsv.h
>
> > @@ -113,8 +113,9 @@ bool lgs_is_peer_v6();
>
> > bool lgs_is_split_file_system();
>
> > uint32_t lgs_mbcsv_dispatch(NCS_MBCSV_HDL mbcsv_hdl);
>
> > void lgs_free_edu_mem(char *ptr);
>
> > -uint32_t lgs_ckpt_stream_open_set(log_stream_t *logStream,
>
> > - lgs_ckpt_stream_open_t *stream_open);
>
> > +void lgs_ckpt_stream_open_set(log_stream_t *logStream,
>
> > + lgs_ckpt_stream_open_t *stream_open,
>
> > + const uint32_t &client_id);
>
> > uint32_t edp_ed_header_rec(EDU_HDL *edu_hdl, EDU_TKN *edu_tkn,
> NCSCONTEXT ptr,
>
> > uint32_t *ptr_data_len, EDU_BUF_ENV *buf_env,
>
> > EDP_OP_TYPE op, EDU_ERR *o_err);
>
> > diff --git a/src/log/logd/lgs_stream.cc b/src/log/logd/lgs_stream.cc
>
> > index 69da6c10e..26014c936 100644
>
> > --- a/src/log/logd/lgs_stream.cc
>
> > +++ b/src/log/logd/lgs_stream.cc
>
> > @@ -31,9 +31,11 @@
>
> > #include <algorithm>
>
> >
>
> > #include "log/logd/lgs.h"
>
> > -#include "lgs_config.h"
>
> > +#include "log/logd/lgs_config.h"
>
> > #include "log/logd/lgs_file.h"
>
> > #include "log/logd/lgs_filehdl.h"
>
> > +#include "log/logd/lgs_mbcsv_v1.h"
>
> > +#include "log/logd/lgs_mbcsv_v2.h"
>
> > #include "base/osaf_time.h"
>
> > #include "osaf/immutil/immutil.h"
>
> >
>
> > @@ -1597,3 +1599,40 @@ void log_stream_form_dest_names(log_stream_t
> *stream) {
>
> > }
>
> > stream->stb_dest_names = output;
>
> > }
>
> > +
>
> > +/**
>
> > +* Check-pointing the opened stream
>
> > +* @param stream
>
> > +* @param client_id
>
> > +* @return
>
> > +*/
>
> > +void lgs_ckpt_stream_open(log_stream_t *stream, const uint32_t
> &client_id) {
>
> > + uint32_t rc;
>
> > + lgsv_ckpt_msg_v1_t ckpt_v1;
>
> > + lgsv_ckpt_msg_v2_t ckpt_v2;
>
> > + void *ckpt_ptr;
>
> > + lgs_ckpt_stream_open_t *stream_open_ptr;
>
> > + lgsv_ckpt_header_t *header_ptr;
>
> > +
>
> > + TRACE_ENTER();
>
> > +
>
> > + if (lgs_is_peer_v2()) {
>
> > + memset(&ckpt_v2, 0, sizeof(ckpt_v2));
>
> > + header_ptr = &ckpt_v2.header;
>
> > + stream_open_ptr = &ckpt_v2.ckpt_rec.stream_open;
>
> > + ckpt_ptr = &ckpt_v2;
>
> > + } else {
>
> > + memset(&ckpt_v1, 0, sizeof(ckpt_v1));
>
> > + header_ptr = &ckpt_v1.header;
>
> > + stream_open_ptr = &ckpt_v1.ckpt_rec.stream_open;
>
> > + ckpt_ptr = &ckpt_v1;
>
> > + }
>
> > + header_ptr->ckpt_rec_type = LGS_CKPT_OPEN_STREAM;
>
> > + header_ptr->num_ckpt_records = 1;
>
> > + header_ptr->data_len = 1;
>
> > +
>
> > + lgs_ckpt_stream_open_set(stream, stream_open_ptr, client_id);
>
> > + rc = lgs_ckpt_send_async(lgs_cb, ckpt_ptr, NCS_MBCSV_ACT_ADD);
>
> > +
>
> > + TRACE_LEAVE2("Check-pointing the opened stream: rc=%d", rc);
>
> > +}
>
> > diff --git a/src/log/logd/lgs_stream.h b/src/log/logd/lgs_stream.h
>
> > index 1ed4b1571..0ef5b7a11 100644
>
> > --- a/src/log/logd/lgs_stream.h
>
> > +++ b/src/log/logd/lgs_stream.h
>
> > @@ -143,4 +143,6 @@ void log_stream_delete_dest_name(log_stream_t
> *stream,
>
> > const std::vector<std::string> &names);
>
> > void log_stream_form_dest_names(log_stream_t *stream);
>
> >
>
> > +void lgs_ckpt_stream_open(log_stream_t *stream, const uint32_t
> &client_id);
>
> > +
>
> > #endif // LOG_LOGD_LGS_STREAM_H_
>
------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, Slashdot.org! http://sdm.link/slashdot
_______________________________________________
Opensaf-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/opensaf-devel