Handling of checkpoint for stream open is in serveral places. Handling of
checkpoint
is called in proc_stream_open_msg forgot to add checkpoint of destination name.
Refactor so that handling of checkpoint data for stream open is done in one
place
and destination name was already added in checkpoint data at this place
---
src/log/logd/lgs_evt.cc | 69 ++--------------------------------------------
src/log/logd/lgs_imm.cc | 40 ++-------------------------
src/log/logd/lgs_mbcsv.cc | 15 +++++-----
src/log/logd/lgs_mbcsv.h | 5 ++--
src/log/logd/lgs_stream.cc | 41 ++++++++++++++++++++++++++-
src/log/logd/lgs_stream.h | 2 ++
6 files changed, 57 insertions(+), 115 deletions(-)
diff --git a/src/log/logd/lgs_evt.cc b/src/log/logd/lgs_evt.cc
index 6972efe55..beb46e7a7 100644
--- a/src/log/logd/lgs_evt.cc
+++ b/src/log/logd/lgs_evt.cc
@@ -821,72 +821,6 @@ snd_rsp:
}
/**
- * Stream open checkpointing
- * @param cb
- * @param logStream
- * @param open_sync_param
- * @return
- */
-static uint32_t lgs_ckpt_stream_open(lgs_cb_t *cb, log_stream_t *logStream,
- lgsv_stream_open_req_t *open_sync_param) {
- uint32_t async_rc = NCSCC_RC_SUCCESS;
- lgsv_ckpt_msg_v1_t ckpt_v1;
- lgsv_ckpt_msg_v2_t ckpt_v2;
- void *ckpt_ptr;
- lgsv_ckpt_header_t *header_ptr;
- lgs_ckpt_stream_open_t *ckpt_rec_open_ptr;
-
- TRACE_ENTER();
-
- if (lgs_is_peer_v2()) {
- memset(&ckpt_v2, 0, sizeof(ckpt_v2));
- header_ptr = &ckpt_v2.header;
- ckpt_rec_open_ptr = &ckpt_v2.ckpt_rec.stream_open;
- ckpt_ptr = &ckpt_v2;
- } else {
- memset(&ckpt_v1, 0, sizeof(ckpt_v1));
- header_ptr = &ckpt_v1.header;
- ckpt_rec_open_ptr = &ckpt_v1.ckpt_rec.stream_open;
- ckpt_ptr = &ckpt_v1;
- }
-
- if (cb->ha_state == SA_AMF_HA_ACTIVE) {
- header_ptr->ckpt_rec_type = LGS_CKPT_OPEN_STREAM;
- header_ptr->num_ckpt_records = 1;
- header_ptr->data_len = 1;
- ckpt_rec_open_ptr->clientId = open_sync_param->client_id;
- ckpt_rec_open_ptr->streamId = logStream->streamId;
-
- ckpt_rec_open_ptr->logFile =
- const_cast<char *>(logStream->fileName.c_str());
- ckpt_rec_open_ptr->logPath =
- const_cast<char *>(logStream->pathName.c_str());
- ckpt_rec_open_ptr->logFileCurrent =
- const_cast<char *>(logStream->logFileCurrent.c_str());
- ckpt_rec_open_ptr->fileFmt = logStream->logFileFormat;
- ckpt_rec_open_ptr->logStreamName =
- const_cast<char *>(logStream->name.c_str());
-
- ckpt_rec_open_ptr->maxFileSize = logStream->maxLogFileSize;
- ckpt_rec_open_ptr->maxLogRecordSize = logStream->fixedLogRecordSize;
- ckpt_rec_open_ptr->logFileFullAction = logStream->logFullAction;
- ckpt_rec_open_ptr->maxFilesRotated = logStream->maxFilesRotated;
- ckpt_rec_open_ptr->creationTimeStamp = logStream->creationTimeStamp;
- ckpt_rec_open_ptr->numOpeners = logStream->numOpeners;
-
- ckpt_rec_open_ptr->streamType = logStream->streamType;
- ckpt_rec_open_ptr->logRecordId = logStream->logRecordId;
-
- async_rc = lgs_ckpt_send_async(cb, ckpt_ptr, NCS_MBCSV_ACT_ADD);
- if (async_rc == NCSCC_RC_SUCCESS) {
- TRACE_4("REG_REC ASYNC UPDATE SEND SUCCESS...");
- }
- }
- TRACE_LEAVE2("async_rc = %d", async_rc);
- return async_rc;
-}
-
-/**
* Create a new application stream
*
* @param open_sync_param[in] Parameters used to create the stream
@@ -1226,8 +1160,9 @@ snd_rsp:
rc = lgs_mds_msg_send(cb, &msg, &evt->fr_dest, &evt->mds_ctxt,
MDS_SEND_PRIORITY_HIGH);
+ // Checkpoint the opened stream
if (ais_rv == SA_AIS_OK) {
- (void)lgs_ckpt_stream_open(cb, logStream, open_sync_param);
+ (void)lgs_ckpt_stream_open(logStream, open_sync_param->client_id);
}
// These memories are allocated in MDS log open decode callback.
diff --git a/src/log/logd/lgs_imm.cc b/src/log/logd/lgs_imm.cc
index caf0cc92a..2eb3b7544 100644
--- a/src/log/logd/lgs_imm.cc
+++ b/src/log/logd/lgs_imm.cc
@@ -318,43 +318,6 @@ static uint32_t ckpt_stream_config(log_stream_t *stream) {
}
/**
- * Pack and send an open stream checkpoint using mbcsv
- * @param stream
- * @return uint32
- */
-static uint32_t ckpt_stream_open(log_stream_t *stream) {
- uint32_t rc;
- lgsv_ckpt_msg_v1_t ckpt_v1;
- lgsv_ckpt_msg_v2_t ckpt_v2;
- void *ckpt_ptr;
- lgs_ckpt_stream_open_t *stream_open_ptr;
- lgsv_ckpt_header_t *header_ptr;
-
- TRACE_ENTER();
-
- if (lgs_is_peer_v2()) {
- memset(&ckpt_v2, 0, sizeof(ckpt_v2));
- header_ptr = &ckpt_v2.header;
- stream_open_ptr = &ckpt_v2.ckpt_rec.stream_open;
- ckpt_ptr = &ckpt_v2;
- } else {
- memset(&ckpt_v1, 0, sizeof(ckpt_v1));
- header_ptr = &ckpt_v1.header;
- stream_open_ptr = &ckpt_v1.ckpt_rec.stream_open;
- ckpt_ptr = &ckpt_v1;
- }
- header_ptr->ckpt_rec_type = LGS_CKPT_OPEN_STREAM;
- header_ptr->num_ckpt_records = 1;
- header_ptr->data_len = 1;
-
- lgs_ckpt_stream_open_set(stream, stream_open_ptr);
- rc = lgs_ckpt_send_async(lgs_cb, ckpt_ptr, NCS_MBCSV_ACT_ADD);
-
- TRACE_LEAVE();
- return rc;
-}
-
-/**
* Pack and send a close stream checkpoint using mbcsv
* @param stream
* @param recType
@@ -2409,7 +2372,8 @@ static void stream_ccb_apply_create(const
CcbUtilOperationData_t *opdata) {
if ((rc = stream_create_and_configure1(opdata, &stream)) == SA_AIS_OK) {
log_stream_open_fileinit(stream);
- ckpt_stream_open(stream);
+ // Checkpoint the opened stream with invalid clientId (-1)
+ lgs_ckpt_stream_open(stream, -1);
} else {
LOG_IN("Stream create and configure failed %d", rc);
}
diff --git a/src/log/logd/lgs_mbcsv.cc b/src/log/logd/lgs_mbcsv.cc
index 59232e138..b94c8cad2 100644
--- a/src/log/logd/lgs_mbcsv.cc
+++ b/src/log/logd/lgs_mbcsv.cc
@@ -670,16 +670,18 @@ static uint32_t ckpt_enc_cold_sync_data(lgs_cb_t *lgs_cb,
} /*End ckpt_enc_cold_sync_data() */
/**
- * Set parameters for open stream
+ * Set parameters for check-pointing the opened stream
*
* @param logStream
* @param stream_open
+ * @param client_id
* @return
*/
-uint32_t lgs_ckpt_stream_open_set(log_stream_t *logStream,
- lgs_ckpt_stream_open_t *stream_open) {
+void lgs_ckpt_stream_open_set(log_stream_t *logStream,
+ lgs_ckpt_stream_open_t *stream_open,
+ const uint32_t &client_id) {
memset(stream_open, 0, sizeof(lgs_ckpt_stream_open_t));
- stream_open->clientId = -1; /* not used in this message */
+ stream_open->clientId = client_id;
stream_open->streamId = logStream->streamId;
stream_open->logFile = const_cast<char *>(logStream->fileName.c_str());
stream_open->logPath = const_cast<char *>(logStream->pathName.c_str());
@@ -697,8 +699,6 @@ uint32_t lgs_ckpt_stream_open_set(log_stream_t *logStream,
stream_open->numOpeners = logStream->numOpeners;
stream_open->streamType = logStream->streamType;
stream_open->logRecordId = logStream->logRecordId;
-
- return NCSCC_RC_SUCCESS;
}
/**
@@ -737,7 +737,8 @@ static uint32_t edu_enc_streams(lgs_cb_t *cb, NCS_UBAID
*uba) {
SaBoolT endloop = SA_FALSE, jstart = SA_TRUE;
while ((log_stream_rec = iterate_all_streams(endloop, jstart)) && !endloop) {
jstart = SA_FALSE;
- lgs_ckpt_stream_open_set(log_stream_rec, ckpt_stream_rec);
+ // Encode the stream with invalid clientId for cold sync
+ lgs_ckpt_stream_open_set(log_stream_rec, ckpt_stream_rec, -1);
rc = m_NCS_EDU_EXEC(&cb->edu_hdl, edp_ed_open_stream_rec, uba,
EDP_OP_TYPE_ENC, ckpt_stream_rec, &ederror);
diff --git a/src/log/logd/lgs_mbcsv.h b/src/log/logd/lgs_mbcsv.h
index b63b0cc2d..75a96deb6 100644
--- a/src/log/logd/lgs_mbcsv.h
+++ b/src/log/logd/lgs_mbcsv.h
@@ -113,8 +113,9 @@ bool lgs_is_peer_v6();
bool lgs_is_split_file_system();
uint32_t lgs_mbcsv_dispatch(NCS_MBCSV_HDL mbcsv_hdl);
void lgs_free_edu_mem(char *ptr);
-uint32_t lgs_ckpt_stream_open_set(log_stream_t *logStream,
- lgs_ckpt_stream_open_t *stream_open);
+void lgs_ckpt_stream_open_set(log_stream_t *logStream,
+ lgs_ckpt_stream_open_t *stream_open,
+ const uint32_t &client_id);
uint32_t edp_ed_header_rec(EDU_HDL *edu_hdl, EDU_TKN *edu_tkn, NCSCONTEXT ptr,
uint32_t *ptr_data_len, EDU_BUF_ENV *buf_env,
EDP_OP_TYPE op, EDU_ERR *o_err);
diff --git a/src/log/logd/lgs_stream.cc b/src/log/logd/lgs_stream.cc
index 69da6c10e..26014c936 100644
--- a/src/log/logd/lgs_stream.cc
+++ b/src/log/logd/lgs_stream.cc
@@ -31,9 +31,11 @@
#include <algorithm>
#include "log/logd/lgs.h"
-#include "lgs_config.h"
+#include "log/logd/lgs_config.h"
#include "log/logd/lgs_file.h"
#include "log/logd/lgs_filehdl.h"
+#include "log/logd/lgs_mbcsv_v1.h"
+#include "log/logd/lgs_mbcsv_v2.h"
#include "base/osaf_time.h"
#include "osaf/immutil/immutil.h"
@@ -1597,3 +1599,40 @@ void log_stream_form_dest_names(log_stream_t *stream) {
}
stream->stb_dest_names = output;
}
+
+/**
+* Check-pointing the opened stream
+* @param stream
+* @param client_id
+* @return
+*/
+void lgs_ckpt_stream_open(log_stream_t *stream, const uint32_t &client_id) {
+ uint32_t rc;
+ lgsv_ckpt_msg_v1_t ckpt_v1;
+ lgsv_ckpt_msg_v2_t ckpt_v2;
+ void *ckpt_ptr;
+ lgs_ckpt_stream_open_t *stream_open_ptr;
+ lgsv_ckpt_header_t *header_ptr;
+
+ TRACE_ENTER();
+
+ if (lgs_is_peer_v2()) {
+ memset(&ckpt_v2, 0, sizeof(ckpt_v2));
+ header_ptr = &ckpt_v2.header;
+ stream_open_ptr = &ckpt_v2.ckpt_rec.stream_open;
+ ckpt_ptr = &ckpt_v2;
+ } else {
+ memset(&ckpt_v1, 0, sizeof(ckpt_v1));
+ header_ptr = &ckpt_v1.header;
+ stream_open_ptr = &ckpt_v1.ckpt_rec.stream_open;
+ ckpt_ptr = &ckpt_v1;
+ }
+ header_ptr->ckpt_rec_type = LGS_CKPT_OPEN_STREAM;
+ header_ptr->num_ckpt_records = 1;
+ header_ptr->data_len = 1;
+
+ lgs_ckpt_stream_open_set(stream, stream_open_ptr, client_id);
+ rc = lgs_ckpt_send_async(lgs_cb, ckpt_ptr, NCS_MBCSV_ACT_ADD);
+
+ TRACE_LEAVE2("Check-pointing the opened stream: rc=%d", rc);
+}
diff --git a/src/log/logd/lgs_stream.h b/src/log/logd/lgs_stream.h
index 1ed4b1571..0ef5b7a11 100644
--- a/src/log/logd/lgs_stream.h
+++ b/src/log/logd/lgs_stream.h
@@ -143,4 +143,6 @@ void log_stream_delete_dest_name(log_stream_t *stream,
const std::vector<std::string> &names);
void log_stream_form_dest_names(log_stream_t *stream);
+void lgs_ckpt_stream_open(log_stream_t *stream, const uint32_t &client_id);
+
#endif // LOG_LOGD_LGS_STREAM_H_
--
2.11.0
------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, Slashdot.org! http://sdm.link/slashdot
_______________________________________________
Opensaf-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/opensaf-devel