In order to improve resilience of OpenSAF LOG service when underlying file system is unresponsive, a queue is introduced to hold async write request up to an configurable time that is around 15 - 30 seconds.
The readiness of the I/O thread will periodically check, and if it turns to ready state, the front element will go first. Returns SA_AIS_ERR_TRY_AGAIN to client if the element stays in the queue longer than the setting time. The queue capacity and the resilient time are configurable via the attributes: `logMaxPendingWriteRequests` and `logResilienceTimeout`. In default, this feature is disabled to keep log server backward compatible. --- src/log/Makefile.am | 21 +- src/log/config/logsv_classes.xml | 43 ++- src/log/logd/lgs_cache.cc | 469 +++++++++++++++++++++++++++++++ src/log/logd/lgs_cache.h | 287 +++++++++++++++++++ src/log/logd/lgs_config.cc | 78 ++++- src/log/logd/lgs_config.h | 10 +- src/log/logd/lgs_evt.cc | 161 +++-------- src/log/logd/lgs_evt.h | 10 + src/log/logd/lgs_file.cc | 8 +- src/log/logd/lgs_filehdl.cc | 58 ++-- src/log/logd/lgs_imm.cc | 40 ++- src/log/logd/lgs_main.cc | 24 +- src/log/logd/lgs_mbcsv.cc | 447 +++++++++++++++++++++++------ src/log/logd/lgs_mbcsv.h | 19 +- src/log/logd/lgs_mbcsv_cache.cc | 372 ++++++++++++++++++++++++ src/log/logd/lgs_mbcsv_cache.h | 110 ++++++++ src/log/logd/lgs_mbcsv_v1.cc | 1 + src/log/logd/lgs_mbcsv_v2.cc | 2 + 18 files changed, 1889 insertions(+), 271 deletions(-) create mode 100644 src/log/logd/lgs_cache.cc create mode 100644 src/log/logd/lgs_cache.h create mode 100644 src/log/logd/lgs_mbcsv_cache.cc create mode 100644 src/log/logd/lgs_mbcsv_cache.h diff --git a/src/log/Makefile.am b/src/log/Makefile.am index f63a4a053..3367ef4f6 100644 --- a/src/log/Makefile.am +++ b/src/log/Makefile.am @@ -95,7 +95,9 @@ noinst_HEADERS += \ src/log/logd/lgs_nildest.h \ src/log/logd/lgs_unixsock_dest.h \ src/log/logd/lgs_common.h \ - src/log/logd/lgs_amf.h + src/log/logd/lgs_amf.h \ + src/log/logd/lgs_cache.h \ + src/log/logd/lgs_mbcsv_cache.h bin_PROGRAMS += bin/saflogger @@ -123,6 +125,15 @@ bin_osaflogd_CPPFLAGS = \ -DSA_EXTENDED_NAME_SOURCE \ $(AM_CPPFLAGS) +# Enable this flag to simulate the case that file system is unresponsive +# during write log record. Mainly for testing the following enhancement: +# log: improve the resilience of log service [#3116]. +# When enabled, log handle thread will be suspended 17 seconds every 02 write +# requests and only take affect if the `logMaxPendingWriteRequests` is set +# to an non-zero value. +bin_osaflogd_CPPFLAGS += -DSIMULATE_NFS_UNRESPONSE + + bin_osaflogd_SOURCES = \ src/log/logd/lgs_amf.cc \ src/log/logd/lgs_clm.cc \ @@ -147,7 +158,9 @@ bin_osaflogd_SOURCES = \ src/log/logd/lgs_util.cc \ src/log/logd/lgs_dest.cc \ src/log/logd/lgs_nildest.cc \ - src/log/logd/lgs_unixsock_dest.cc + src/log/logd/lgs_unixsock_dest.cc \ + src/log/logd/lgs_cache.cc \ + src/log/logd/lgs_mbcsv_cache.cc bin_osaflogd_LDADD = \ lib/libosaf_common.la \ @@ -183,6 +196,10 @@ bin_logtest_CPPFLAGS = \ -DSA_EXTENDED_NAME_SOURCE \ $(AM_CPPFLAGS) +# Enable this flag to add test cases for following enhancement: +# log: improve the resilience of log service [#3116]. +bin_logtest_CPPFLAGS += -DSIMULATE_NFS_UNRESPONSE + bin_logtest_SOURCES = \ src/log/apitest/logtest.c \ src/log/apitest/logutil.c \ diff --git a/src/log/config/logsv_classes.xml b/src/log/config/logsv_classes.xml index 9359823ff..084e8915d 100644 --- a/src/log/config/logsv_classes.xml +++ b/src/log/config/logsv_classes.xml @@ -195,7 +195,7 @@ to ensure that default global values in the implementation are also changed acco <type>SA_UINT32_T</type> <category>SA_CONFIG</category> <flag>SA_WRITABLE</flag> - <default-value>1024</default-value> + <default-value>1024</default-value> </attr> <attr> <name>logStreamFileFormat</name> @@ -208,42 +208,42 @@ to ensure that default global values in the implementation are also changed acco <type>SA_UINT32_T</type> <category>SA_CONFIG</category> <flag>SA_WRITABLE</flag> - <default-value>0</default-value> + <default-value>0</default-value> </attr> <attr> <name>logStreamSystemLowLimit</name> <type>SA_UINT32_T</type> <category>SA_CONFIG</category> <flag>SA_WRITABLE</flag> - <default-value>0</default-value> + <default-value>0</default-value> </attr> <attr> <name>logStreamAppHighLimit</name> <type>SA_UINT32_T</type> <category>SA_CONFIG</category> <flag>SA_WRITABLE</flag> - <default-value>0</default-value> + <default-value>0</default-value> </attr> <attr> <name>logStreamAppLowLimit</name> <type>SA_UINT32_T</type> <category>SA_CONFIG</category> <flag>SA_WRITABLE</flag> - <default-value>0</default-value> + <default-value>0</default-value> </attr> <attr> <name>logMaxApplicationStreams</name> <type>SA_UINT32_T</type> <category>SA_CONFIG</category> <flag>SA_WRITABLE</flag> - <default-value>64</default-value> + <default-value>64</default-value> </attr> <attr> <name>logFileIoTimeout</name> <type>SA_UINT32_T</type> <category>SA_CONFIG</category> <flag>SA_WRITABLE</flag> - <default-value>500</default-value> + <default-value>500</default-value> </attr> <attr> <name>logFileSysConfig</name> @@ -266,6 +266,20 @@ to ensure that default global values in the implementation are also changed acco <flag>SA_MULTI_VALUE</flag> <flag>SA_NO_DUPLICATES</flag> </attr> + <attr> + <name>logMaxPendingWriteRequests</name> + <type>SA_UINT32_T</type> + <category>SA_CONFIG</category> + <flag>SA_WRITABLE</flag> + <default-value>0</default-value> + </attr> + <attr> + <name>logResilienceTimeout</name> + <type>SA_UINT32_T</type> + <category>SA_CONFIG</category> + <flag>SA_WRITABLE</flag> + <default-value>15</default-value> + </attr> </class> <class name="OpenSafLogCurrentConfig"> <category>SA_RUNTIME</category> @@ -342,5 +356,20 @@ to ensure that default global values in the implementation are also changed acco <category>SA_RUNTIME</category> <flag>SA_MULTI_VALUE</flag> </attr> + <attr> + <name>logMaxPendingWriteRequests</name> + <type>SA_UINT32_T</type> + <category>SA_RUNTIME</category> + </attr> + <attr> + <name>logResilienceTimeout</name> + <type>SA_UINT32_T</type> + <category>SA_RUNTIME</category> + </attr> + <attr> + <name>logCurrentPendingWriteRequests</name> + <type>SA_UINT32_T</type> + <category>SA_RUNTIME</category> + </attr> </class> </imm:IMM-contents> diff --git a/src/log/logd/lgs_cache.cc b/src/log/logd/lgs_cache.cc new file mode 100644 index 000000000..898185fc8 --- /dev/null +++ b/src/log/logd/lgs_cache.cc @@ -0,0 +1,469 @@ +/* -*- OpenSAF -*- + * + * Copyright Ericsson AB 2019 - All Rights Reserved. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed + * under the GNU Lesser General Public License Version 2.1, February 1999. + * The complete license can be accessed from the following location: + * http://opensource.org/licenses/lgpl-license.php + * See the Copying file included with the OpenSAF distribution for full + * licensing terms. + * + * Author(s): Ericsson AB + * + */ + +#include "log/logd/lgs_cache.h" + +#include "log/logd/lgs_dest.h" +#include "log/logd/lgs_mbcsv_cache.h" +#include "log/logd/lgs_evt.h" +#include "log/logd/lgs_evt.h" +#include "log/logd/lgs_mbcsv.h" +#include "log/logd/lgs_config.h" +#include "base/time.h" + +// The unique id of each queue element. Using this sequence id +// to check if the standby is kept the queue in sync with the active. +static size_t gl_seq_num = 0; + +Cache::WriteAsyncInfo::WriteAsyncInfo(WriteAsyncParam* param, MDS_DEST fr_dest, + const char* node_name) { + TRACE_ENTER(); + invocation = param->invocation; + ack_flags = param->ack_flags; + client_id = param->client_id; + stream_id = param->lstr_id; + severity = 0; + dest = fr_dest; + log_stamp = 0; + svc_name = nullptr; + from_node = nullptr; + // These following info is only for streaming, hence is not valid + // for alarm & notif streams. + log_stream_t* str = stream(); + if ((str->name != SA_LOG_STREAM_ALARM) + && (str->name != SA_LOG_STREAM_NOTIFICATION)) { + severity = param->logRecord->logHeader.genericHdr.logSeverity; + svc_name = strdup(osaf_extended_name_borrow( + param->logRecord->logHeader.genericHdr.logSvcUsrName)); + log_stamp = param->logRecord->logTimeStamp; + from_node = strdup(node_name); + } +} + +Cache::WriteAsyncInfo::WriteAsyncInfo(const CkptPushAsync* data) { + TRACE_ENTER(); + invocation = data->invocation; + ack_flags = data->ack_flags; + client_id = data->client_id; + stream_id = data->stream_id; + log_stamp = data->log_stamp; + severity = data->severity; + dest = data->dest; + svc_name = nullptr; + from_node = nullptr; + if (data->svc_name) svc_name = strdup(data->svc_name); + if (data->from_node) from_node = strdup(data->from_node); +} + +std::string Cache::WriteAsyncInfo::info() const { + TRACE_ENTER(); + char output[256]; + snprintf(output, sizeof(output), "invocation = %llu, client(%s) = %" PRIx64, + invocation, from_node == nullptr ? "(null)" : from_node, dest); + LOG_NO("info = %s", output); + return std::string{output}; +} + +void Cache::WriteAsyncInfo::Dump() const { + LOG_NO("invocation: %llu", invocation); + LOG_NO("client_id: %u", client_id); + LOG_NO("stream_id: %u", stream_id); + LOG_NO("svc_name: %s", svc_name == nullptr ? "(null)" : svc_name); + LOG_NO("from_node: %s", from_node == nullptr ? "(null)" : from_node); +} + +void Cache::WriteAsyncInfo::CloneData(CkptPushAsync* output) const { + TRACE_ENTER(); + output->invocation = invocation; + output->ack_flags = ack_flags; + output->client_id = client_id; + output->stream_id = stream_id; + output->svc_name = svc_name; + output->log_stamp = log_stamp; + output->severity = severity; + output->dest = dest; + output->from_node = from_node; +} + +Cache::Data::Data(std::shared_ptr<WriteAsyncInfo> info, + char* log_record, int size) + : param_{info}, log_record_{log_record}, size_{size} { + queue_at_ = base::TimespecToNanos(base::ReadMonotonicClock()); + seq_id_ = gl_seq_num++; +} + +Cache::Data::Data(const CkptPushAsync* data) { + TRACE_ENTER(); + param_ = std::make_shared<WriteAsyncInfo>(data); + assert(param_); + queue_at_ = data->queue_at; + seq_id_ = data->seq_id; + log_record_ = strdup(data->log_record); + size_ = strlen(log_record_); +} + +void Cache::Data::Dump() const { + LOG_NO("- Cache::Data - "); + LOG_NO("log_record: %s", log_record_); + LOG_NO("seq_id_: %" PRIu64, seq_id_); + LOG_NO("Queue at: %" PRIu64, queue_at_); + param_->Dump(); +} + +void Cache::Data::Streaming() const { + TRACE_ENTER(); + log_stream_t* stream = param_->stream(); + if (stream == nullptr) return; + + // Streaming does not support alarm/notif streams. + if ((stream->name == SA_LOG_STREAM_ALARM) || + (stream->name == SA_LOG_STREAM_NOTIFICATION)) { + return; + } + + // Packing Record data that is carring necessary information + // to form RFC5424 syslog msg, and sends to destination name(s). + RecordData data{}; + timespec time; + data.name = stream->name.c_str(); + data.logrec = log_record_; + data.networkname = lgs_get_networkname().c_str(); + data.msgid = stream->rfc5424MsgId.c_str(); + data.isRtStream = stream->isRtStream; + data.recordId = stream->logRecordId; + data.hostname = param_->from_node; + data.appname = param_->svc_name; + data.sev = param_->severity; + time.tv_sec = (param_->log_stamp / (SaTimeT)SA_TIME_ONE_SECOND); + time.tv_nsec = (param_->log_stamp % (SaTimeT)SA_TIME_ONE_SECOND); + data.time = time; + WriteToDestination(data, stream->dest_names); +} + +bool Cache::Data::is_overdue() const { + uint32_t max_time = Cache::instance()->timeout(); + timespec current = base::ReadMonotonicClock(); + timespec queue_at = base::NanosToTimespec(queue_at_); + timespec max_resilience{static_cast<time_t>(max_time), 0}; + return (current - queue_at > max_resilience); +} + +bool Cache::Data::is_valid(std::string* reason) const { + if (is_stream_open() == false) { + *reason = "the log stream has been closed"; + return false; + } + if (is_overdue() == true) { + *reason = "the record is overdue (stream: " + param_->stream()->name + ")"; + return false; + } + return true; +} + +void Cache::Data::CloneData(CkptPushAsync* output) const { + TRACE_ENTER(); + param_->CloneData(output); + output->queue_at = queue_at_; + output->seq_id = seq_id_; + output->log_record = log_record_; +} + +int Cache::Data::SyncPushWithStandby() const { + TRACE_ENTER(); + assert(is_active() == true && "This instance does not run with active role"); + if (lgs_is_peer_v8() == false) return NCSCC_RC_SUCCESS; + lgsv_ckpt_msg_v8_t ckpt_v8; + void* ckpt_data; + memset(&ckpt_v8, 0, sizeof(ckpt_v8)); + ckpt_v8.header.ckpt_rec_type = LGS_CKPT_PUSH_ASYNC; + ckpt_v8.header.num_ckpt_records = 1; + ckpt_v8.header.data_len = 1; + auto data = &ckpt_v8.ckpt_rec.push_async; + CloneData(data); + ckpt_data = &ckpt_v8; + return lgs_ckpt_send_async(lgs_cb, ckpt_data, NCS_MBCSV_ACT_ADD); +} + +int Cache::Data::SyncPopWithStandby() const { + TRACE_ENTER(); + assert(is_active() == true && "This instance does not run with active role"); + if (lgs_is_peer_v8() == false) return NCSCC_RC_SUCCESS; + lgsv_ckpt_msg_v8_t ckpt_v8; + memset(&ckpt_v8, 0, sizeof(ckpt_v8)); + ckpt_v8.header.ckpt_rec_type = LGS_CKPT_POP_ASYNC; + ckpt_v8.header.num_ckpt_records = 1; + ckpt_v8.header.data_len = 1; + CkptPopAsync* data = &ckpt_v8.ckpt_rec.pop_async; + data->seq_id = seq_id_; + return lgs_ckpt_send_async(lgs_cb, &ckpt_v8, NCS_MBCSV_ACT_ADD); +} + +int Cache::Data::SyncWriteWithStandby() const { + TRACE_ENTER(); + assert(is_active() == true && "This instance does not run with active role"); + if (lgs_is_peer_v8() == false) return NCSCC_RC_SUCCESS; + log_stream_t* stream = param_->stream(); + if (stream == nullptr) { + LOG_NO("The stream id (%u) is closed. Drop the write sync.", + param_->stream_id); + return NCSCC_RC_SUCCESS; + } + lgs_ckpt_log_async(stream, log_record_); + return NCSCC_RC_SUCCESS; +} + +int Cache::Data::SyncPopAndWriteWithStandby() const { + TRACE_ENTER(); + assert(is_active() == true && "This instance does not run with active role"); + log_stream_t* stream = param_->stream(); + if (stream == nullptr) { + LOG_NO("The stream id (%u) is closed. Drop the pop&write sync.", + param_->stream_id); + return NCSCC_RC_SUCCESS; + } + lgsv_ckpt_msg_v8_t ckpt_v8; + memset(&ckpt_v8, 0, sizeof(ckpt_v8)); + ckpt_v8.header.ckpt_rec_type = LGS_CKPT_POP_WRITE_ASYNC; + ckpt_v8.header.num_ckpt_records = 1; + ckpt_v8.header.data_len = 1; + auto data = &ckpt_v8.ckpt_rec.pop_and_write_async; + data->log_record = log_record_; + data->stream_id = stream->streamId; + data->record_id = stream->logRecordId; + data->file_size = stream->curFileSize; + data->log_file = const_cast<char*>(stream->logFileCurrent.c_str()); + data->timestamp = stream->act_last_close_timestamp; + data->seq_id = seq_id_; + return lgs_ckpt_send_async(lgs_cb, &ckpt_v8, NCS_MBCSV_ACT_ADD); +} + +int Cache::Data::Write() const { + TRACE_ENTER(); + log_stream_t* stream = param_->stream(); + assert(stream && "log stream is nullptr"); + return log_stream_write_h(stream, log_record_, size_); +} + +void Cache::Data::AckToClient(SaAisErrorT code) const { + TRACE_ENTER(); + assert(is_active() == true && "This instance does not run with active role"); + if (is_client_alive() == false || + param_->ack_flags != SA_LOG_RECORD_WRITE_ACK) return; + lgs_send_write_log_ack(param_->client_id, param_->invocation, + code, param_->dest); +} + +int Cache::EncodeColdSync(NCS_UBAID* uba) const { + TRACE_ENTER(); + assert(is_active() == true && "This instance does not run with active role"); + if (lgs_is_peer_v8() == false) return NCSCC_RC_SUCCESS; + + uint8_t* pheader = ncs_enc_reserve_space(uba, sizeof(lgsv_ckpt_header_t)); + if (pheader == nullptr) { + LOG_NO("Cache::ColdSync failed."); + return EDU_ERR_MEM_FAIL; + } + ncs_enc_claim_space(uba, sizeof(lgsv_ckpt_header_t)); + + EDU_ERR ederror; + uint32_t num_rec = 0; + for (const auto& e : pending_write_async_) { + CkptPushAsync data; + e->CloneData(&data); + int rc = m_NCS_EDU_EXEC(&lgs_cb->edu_hdl, EncodeDecodePushAsync, uba, + EDP_OP_TYPE_ENC, &data, &ederror); + if (rc != NCSCC_RC_SUCCESS) { + m_NCS_EDU_PRINT_ERROR_STRING(ederror); + return rc; + } + num_rec++; + } + lgsv_ckpt_header_t ckpt_hdr; + memset(&ckpt_hdr, 0, sizeof(ckpt_hdr)); + ckpt_hdr.ckpt_rec_type = LGS_CKPT_PUSH_ASYNC; + ckpt_hdr.num_ckpt_records = num_rec; + ncs_encode_32bit(&pheader, ckpt_hdr.ckpt_rec_type); + ncs_encode_32bit(&pheader, ckpt_hdr.num_ckpt_records); + ncs_encode_32bit(&pheader, ckpt_hdr.data_len); + return NCSCC_RC_SUCCESS; +} + +int Cache::DecodeColdSync(NCS_UBAID* uba, lgsv_ckpt_header_t* header, + void* vdata, void** vckpt_rec, + size_t ckpt_rec_size) const { + TRACE_ENTER(); + assert(is_active() == false && "This instance does not run with standby role"); + if (lgs_is_peer_v8() == false) return NCSCC_RC_SUCCESS; + + assert(uba && header && "Either uba or header is nullptr"); + if (dec_ckpt_header(uba, header) != NCSCC_RC_SUCCESS) { + LOG_NO("lgs_dec_ckpt_header FAILED"); + return NCSCC_RC_FAILURE; + } + + if (header->ckpt_rec_type != LGS_CKPT_PUSH_ASYNC) { + LOG_NO("failed: LGS_CKPT_PUSH_ASYNC type is expected, got %u", + header->ckpt_rec_type); + return NCSCC_RC_FAILURE; + } + + uint32_t num_rec = header->num_ckpt_records; + int rc = NCSCC_RC_SUCCESS; + EDU_ERR ederror; + lgsv_ckpt_msg_v8_t msg_v8; + auto data = &msg_v8.ckpt_rec.push_async; + CkptPushAsync* cache_data; + while (num_rec) { + cache_data = data; + rc = m_NCS_EDU_EXEC(&lgs_cb->edu_hdl, EncodeDecodePushAsync, + uba, EDP_OP_TYPE_DEC, + &cache_data, &ederror); + if (rc != NCSCC_RC_SUCCESS) { + m_NCS_EDU_PRINT_ERROR_STRING(ederror); + return rc; + } + + rc = process_ckpt_data(lgs_cb, vdata); + if (rc != NCSCC_RC_SUCCESS) return rc; + + memset(*vckpt_rec, 0, ckpt_rec_size); + --num_rec; + } + return NCSCC_RC_SUCCESS; +} + +void Cache::PeriodicCheck() { + if (empty() == true || is_active() == false) return; + CleanOverdueData(); + if (is_iothread_ready() == true) { + Flush(); + } +} + +void Cache::Write(std::shared_ptr<Data> data) { + TRACE_ENTER(); + // The resilience feature is disable. Fwd request to I/O thread right away. + if (Capacity() == 0) { + int rc = data->Write(); + if (rc == -1 || rc == -2) { + data->AckToClient(SA_AIS_ERR_TRY_AGAIN); + return; + } + // Write OK, then do post processings. + PostWrite(data); + return; + } + + // The resilience feature is enabled. Caching the request if needed. + if (empty() == true && is_iothread_ready() == true) { + int rc = data->Write(); + // TODO(vu.m.nguyen): the error code is very unclear to know + // what '-1' and '-2' really mean. Should be improved? + if (rc == -1 || rc == -2) { + Push(data); + return; + } + // Write OK, then do post processings. + PostWrite(data); + return; + } + Push(data); + Flush(); +} + +void Cache::PostWrite(std::shared_ptr<Data> data) { + data->Streaming(); + data->SyncWriteWithStandby(); + data->AckToClient(SA_AIS_OK); +} + +void Cache::CleanOverdueData() { + if (empty() == true || is_active() == false) return; + std::string reason{"Ok"}; + auto data = Front(); + if (data->is_valid(&reason) == false) { + // Either the targeting stream has been closed or the owner is dead. + // syslog the detailed info about dropped log record if latter case. + if (data->is_client_alive() == false) { + LOG_NO("Drop invalid log record, reason: %s", reason.c_str()); + LOG_NO("The record info: %s", data->record()); + } else { + data->AckToClient(SA_AIS_ERR_TRY_AGAIN); + } + Pop(false); + } +} + +void Cache::Flush() { + if (empty() || !is_active() || !is_iothread_ready()) return; + auto data = Front(); + int rc = data->Write(); + // Write still gets timeout, do nothing. + if ((rc == -1) || (rc == -2)) return; + // Record is successfully written. Do post processings. + data->Streaming(); + data->AckToClient(SA_AIS_OK); + Pop(true); +} + +void Cache::Push(std::shared_ptr<Data> data) { + TRACE_ENTER(); + if (full() == true) { + data->AckToClient(SA_AIS_ERR_TRY_AGAIN); + return; + } + if (is_active() == true) { + data->SyncPushWithStandby(); + } + pending_write_async_.push_back(data); + TRACE("Number of pending reqs after push: %zu", size()); +} + +void Cache::Pop(bool wstatus) { + TRACE_ENTER(); + auto data = Front(); + if (is_active() == true) { + if (wstatus == false) { + data->SyncPopWithStandby(); + } else { + data->SyncPopAndWriteWithStandby(); + } + } + pending_write_async_.pop_front(); + TRACE("Number of pending reqs after pop: %zu", size()); +} + +int Cache::GeneratePollTimeout(timespec last) const { + if (size() == 0 || !is_active()) return -1; + struct timespec passed_time; + struct timespec current = base::ReadMonotonicClock(); + osaf_timespec_subtract(¤t, &last, &passed_time); + auto passed_time_ms = osaf_timespec_to_millis(&passed_time); + return (passed_time_ms < 100) ? (100 - passed_time_ms) : 0; +} + +uint32_t Cache::timeout() const { + uint32_t timeout = *(static_cast<const uint32_t*>( + lgs_cfg_get(LGS_IMM_LOG_RESILIENCE_TIMEOUT))); + return timeout; +} + +size_t Cache::Capacity() const { + uint32_t max_size = *(static_cast<const uint32_t*>( + lgs_cfg_get(LGS_IMM_LOG_MAX_PENDING_WRITE_REQ))); + return max_size; +} diff --git a/src/log/logd/lgs_cache.h b/src/log/logd/lgs_cache.h new file mode 100644 index 000000000..c999e75bb --- /dev/null +++ b/src/log/logd/lgs_cache.h @@ -0,0 +1,287 @@ +/* -*- OpenSAF -*- + * + * Copyright Ericsson AB 2019 - All Rights Reserved. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed + * under the GNU Lesser General Public License Version 2.1, February 1999. + * The complete license can be accessed from the following location: + * http://opensource.org/licenses/lgpl-license.php + * See the Copying file included with the OpenSAF distribution for full + * licensing terms. + * + * Author(s): Ericsson AB + * + */ + +#ifndef LOG_LOGD_LGS_CACHE_H_ +#define LOG_LOGD_LGS_CACHE_H_ + +#include <atomic> +#include <string.h> +#include <string> +#include <sstream> +#include <deque> +#include <memory> + +#include "log/logd/lgs.h" +#include "log/logd/lgs_mbcsv_cache.h" +#include "base/macros.h" + +// This atomic variable stores the readiness status of file hdle thread. +// It is set to false when the request just arrives at the file handling thread +// and is set to true when the thread is done with the file i/o request. +extern std::atomic<bool> is_filehdl_thread_ready; + +//> +// In order to improve resilience of OpenSAF LOG service when underlying +// file system is unresponsive, a queue is introduced to hold the async +// write request up to an configurable time that is around 15 - 30 seconds. +// +// Before passing the async write request to the file handling thread, +// the request have to go through this Cache class (singleton) via +// Cache::Write() method; if any pending requests in queue, the pending +// request have to go first if the file handling thread is ready; if +// the either having pending requests or file handling thread is not +// ready, the coming write request is pushed into the back of the queue. +// +// The Write() method also takes care of 1) doing checkpoint necessary data +// to the standby, 2) streaming the record to additional destinations, +// 3) giving acknowledgment to client and 4) updating the queue. +// +// Besides, the queue will be periodically monitored from the main poll +// via the method Cache::PeriodicCheck(). This periodic check includes +// 1) Check if any pending request is overdue, if so giving confirmation +// to client with SA_AIS_ERR_TRY_AGAIN code, sync with standby, and +// removing the item from the queue, 2) Check if any targeting stream is closed, +// then do the same as above the case - request is overdue, 3) Check if the +// file handling thread is ready, then forwarding the front request to +// that thread, syncing with standby, and ack to client. +// +// This feature is only enabled if the queue capacity is set to an non-zero +// value via the attribute `logMaxPendingWriteRequests`; Default is disabled +// to keep service backward compatible. +// +// The resilient time is confiruable via the attribute `logResilienceTimeout`. +// +// This class is used by both active and standby log server. +//< +class Cache { + public: + // This is unique entry point for outside world to access Cache methods. + static Cache* instance() { + // Thread safe since C++1y + static Cache cache; + return &cache; + } + + ~Cache() { + pending_write_async_.clear(); + } + + // A part of data that is stored in the queue. The below info is almost + // provided by the log client that have passed into write async request. + // We need these these extra information for 1) Streaming + // 2) Ack to client, 3) Update the log_stream_t if needed. + // We put this part into a separate structure to simplify the way of + // intializing the queued data. + struct WriteAsyncInfo { + SaInvocationT invocation; + uint32_t ack_flags; + uint32_t client_id; + uint32_t stream_id; + char* svc_name; + SaTimeT log_stamp; + SaLogSeverityT severity; + MDS_DEST dest; + char* from_node; + + // Constructors. One for forming cache data from async write event, + // the other one is for forming cache data on standby instance from + // push async event. + WriteAsyncInfo(WriteAsyncParam* param, MDS_DEST dest, + const char* node_name); + explicit WriteAsyncInfo(const CkptPushAsync* data); + + ~WriteAsyncInfo() { + // these attributes are either nullptr or point to valid memories. + // nullptr if the data is targettng to alarm/notif streams. + free(from_node); + free(svc_name); + } + + // Show the info of myself in case the request is dropped. + std::string info() const; + // Dump values of above data - using for debugging almost. + void Dump() const; + // Clone a copy of my data into `CkptPushAsync` for synching + // with standby. + void CloneData(CkptPushAsync* output) const; + + // Check if the client whose owns this WriteAsyncInfo data + // is alive. True if alive, false otherwise. + bool is_client_alive() const { + return lgs_client_get_by_id(client_id) != nullptr; + } + + // Check if the targeting stream of this data is openning or not. + // True if open, false otherwise. + bool is_stream_open() const { + return log_stream_get_by_id(stream_id) != nullptr; + } + + // Get the stream instance which this data is targetting. + log_stream_t* stream() const { + return log_stream_get_by_id(stream_id); + } + }; + + // This class reprensents the actual data that the queue stores in. + // In addition of above info, the data also holds the time showing + // when the data is put into queue, the unique sequence id of the data + // and the full log record containing right format that complies with + // tokens given to the targeting stream. + class Data { + public: + // Constructors. One for forming cache data from async write event, + // the other one is for forming cache data on standby instance from + // push async event. + Data(std::shared_ptr<WriteAsyncInfo> info, char* log_record, int size); + explicit Data(const CkptPushAsync* data); + + ~Data() { + free(log_record_); + } + + // Show detailed information about this data. Benefit for logging when + // the record is dropped. + std::string info() const { return param_->info(); } + // Check if the client owning this data is still alive. + bool is_client_alive() const { return param_->is_client_alive(); } + // Check if the targeting stream is opening. + bool is_stream_open() const { return param_->is_stream_open(); } + // Get the full log record. + char* record() const { return log_record_; } + // Check if the data is valid or not. The data is not valid if either + // the targeting stream is closed or the the time of its staying in the + // queue is reaching the maximum. + bool is_valid(std::string* reason) const; + // Dump the values of data's attributes. + void Dump() const; + // Clone values of my attributes to `CkptPushAsync`; and CkptPushAsync + // value is used for synching with standby. + void CloneData(CkptPushAsync* data) const; + // Synch necessary data to standby in case of pushing a write async + // to the queue. This is only valid to active log service. + int SyncPushWithStandby() const; + // Synch necessary data to standby in case of pop a write async + // from the queue. This is only valid to active log service. + int SyncPopWithStandby() const; + // Sync necessary data to standby in case of successfully writing + // a async write request. ONly valid to active log service. + int SyncWriteWithStandby() const; + // Sync necessary data to standby in case of successfully writing + // a async write request after the file handling thread transits + // from unreadiness to readiness. In other word, this is a combination + // b/w SyncPopWithStandby and SyncWriteWithStandby, but we put the case + // into a separated request to optimize the traffic load. + int SyncPopAndWriteWithStandby() const; + // Forward the data to the file handling thread. + int Write() const; + // Send acknowledge with given code to client if the client is still alive + // and the client is desired to receive the confirmation. + void AckToClient(SaAisErrorT code) const; + // Performing streaming this data if needed. + void Streaming() const; + // Check if the data has been stayed in the queue so long - reaching + // the maximum setting time. + bool is_overdue() const; + + // Store the local time when log server starts to process the write request + uint64_t queue_at_; + // The unique id for this data + uint64_t seq_id_; + // Write async info which is comming from log client via write async request + std::shared_ptr<WriteAsyncInfo> param_; + // The full log record which already complied with stream format + char* log_record_; + // The record size + int size_; + }; + + // Verify if the given capacity `max` is valid. The value is considered + // valid if the value is either not larger than 1000 or less than current size + // of the queue. Default capacity is zero (0). + bool VerifyMaxQueueSize(uint32_t max) const { + if (max <= 1000 && max >= size()) return true; + return false; + } + // Verify if the given resilient time is valid or not. The valid value + // is in range [15 - 30] seconds. Default value is 15s. + bool VerifyResilienceTime(uint32_t time) const { + if (time >= 15 && time <= 30) return true; + return false; + } + + // Return the queue size + size_t size() const { return pending_write_async_.size(); } + // Get the reference to the front element + std::shared_ptr<Data> Front() const { return pending_write_async_.front(); } + // Generate the approriate poll timeout depending on the last poll run, + // queue size and HA state of log server instance. + int GeneratePollTimeout(timespec last) const; + // Pop the front element from queue. wstatus shows if the going-to-pop + // request has been successfully written to log file (wstatus = true) + // or it has been dropped due to the data is invalid (wstatus = false). + void Pop(bool wstatus = false); + // Periodic check the data in queue whether if any of them is invalid + // and also check if the file handling thread state turns to ready. + void PeriodicCheck(); + // Forward the data to the file handling thread or put back into + // the queue depending on the readiness of the thread/and queue status. + void Write(std::shared_ptr<Data> data); + // Push the data back into the queue. + void Push(std::shared_ptr<Data> data); + // Return the queue's capacity. + size_t Capacity() const; + // Encode the queue at cold sync on active side. + int EncodeColdSync(NCS_UBAID* uba) const; + // Decode the queue on stanby side. + int DecodeColdSync(NCS_UBAID* uba, lgsv_ckpt_header_t* header, + void* vdata, void** vckpt_rec, + size_t ckpt_rec_size) const; + + private: + // Don't allow to instantiate this object. + Cache() : pending_write_async_{} {} + + // true if the queue is empty. + bool empty() const { return pending_write_async_.empty(); } + // true if the queue is full - reaching the given capacity. + bool full() const { return size() == Capacity(); } + // true if the file handling thread is ready. + bool is_iothread_ready() const { return is_filehdl_thread_ready; } + // Flush the front element of the queue. + void Flush(); + // Remove the front if its data is no longer valid. + void CleanOverdueData(); + // Return the setting resilience timeout + uint32_t timeout() const; + // Jobs need to be done after writing record to file successfully. + // 1) streaming to destination 2) sync with standby 3) ack to client + void PostWrite(std::shared_ptr<Data> data); + + private: + // Use std::deque<> rather std::queue because we need to access + // all elements at once during cold sync. Adding to this queue + // when getting timeout from I/O thread, and removing from this + // queue when the data has successfully written to log file. + // This queue is always kept in sync with standby. + std::deque<std::shared_ptr<Data> > pending_write_async_; + + DELETE_COPY_AND_MOVE_OPERATORS(Cache); +}; + +#endif // LOG_LOGD_LGS_CACHE_H_ + diff --git a/src/log/logd/lgs_config.cc b/src/log/logd/lgs_config.cc index 44e10b84d..f2af48ed0 100644 --- a/src/log/logd/lgs_config.cc +++ b/src/log/logd/lgs_config.cc @@ -42,7 +42,7 @@ #include "log/logd/lgs.h" #include "log/logd/lgs_common.h" #include "log/logd/lgs_oi_admin.h" - +#include "log/logd/lgs_cache.h" /* Mutex for making read and write of configuration data thread safe */ pthread_mutex_t lgs_config_data_mutex = PTHREAD_MUTEX_INITIALIZER; @@ -83,6 +83,8 @@ static struct lgs_conf_def_t { SaUint32T logMaxApplicationStreams; SaUint32T logFileIoTimeout; SaUint32T logFileSysConfig; + SaUint32T logResilienceTimeout; + SaUint32T logMaxPendingWriteReq; lgs_conf_def_t() { logRootDirectory = PKGLOGDIR; @@ -96,6 +98,8 @@ static struct lgs_conf_def_t { logMaxApplicationStreams = 64; logFileIoTimeout = 500; logFileSysConfig = 1; + logResilienceTimeout = 15; + logMaxPendingWriteReq = 0; } } lgs_conf_def; @@ -115,6 +119,8 @@ typedef struct _lgs_conf_t { SaUint32T logMaxApplicationStreams; SaUint32T logFileIoTimeout; SaUint32T logFileSysConfig; + SaUint32T logResilienceTimeout; + SaUint32T logMaxPendingWriteReq; std::vector<std::string> logRecordDestinationConfiguration; // Default empty /* --- end correspond to IMM Class --- */ @@ -139,6 +145,8 @@ typedef struct _lgs_conf_t { lgs_conf_flg_t logDataGroupname_cnfflag; lgs_conf_flg_t logStreamFileFormat_cnfflag; lgs_conf_flg_t logRecordDestinationConfiguration_cnfflag; + lgs_conf_flg_t logResilienceTimeout_cnfflag; + lgs_conf_flg_t logMaxPendingWriteReq_cnfflag; _lgs_conf_t() : logRootDirectory{PKGLOGDIR}, @@ -153,7 +161,9 @@ typedef struct _lgs_conf_t { logFileSysConfig_cnfflag{LGS_CNF_DEF}, logDataGroupname_cnfflag{LGS_CNF_DEF}, logStreamFileFormat_cnfflag{LGS_CNF_DEF}, - logRecordDestinationConfiguration_cnfflag{LGS_CNF_DEF} { + logRecordDestinationConfiguration_cnfflag{LGS_CNF_DEF}, + logResilienceTimeout_cnfflag{LGS_CNF_DEF}, + logMaxPendingWriteReq_cnfflag{LGS_CNF_DEF} { OpenSafLogConfig_object_exist = false; /* * The following attributes cannot be configured in the config file @@ -171,6 +181,8 @@ typedef struct _lgs_conf_t { logMaxApplicationStreams = lgs_conf_def.logMaxApplicationStreams; logFileIoTimeout = lgs_conf_def.logFileIoTimeout; logFileSysConfig = lgs_conf_def.logFileSysConfig; + logResilienceTimeout = lgs_conf_def.logResilienceTimeout; + logMaxPendingWriteReq = lgs_conf_def.logMaxPendingWriteReq; } } lgs_conf_t; @@ -453,6 +465,18 @@ int lgs_cfg_update(const lgs_config_chg_t *config_data) { (SaUint32T)strtoul(value_str, nullptr, 0); } else if (strcmp(name_str, LOG_FILE_IO_TIMEOUT) == 0) { lgs_conf.logFileIoTimeout = (SaUint32T)strtoul(value_str, nullptr, 0); + } else if (strcmp(name_str, LOG_RESILIENCE_TIMEOUT) == 0) { + lgs_conf.logResilienceTimeout = (SaUint32T)strtoul(value_str, nullptr, 0); + } else if (strcmp(name_str, LOG_MAX_PENDING_WRITE_REQ) == 0) { + lgs_conf.logMaxPendingWriteReq = + (SaUint32T)strtoul(value_str, nullptr, 0); + +#ifdef SIMULATE_NFS_UNRESPONSE + // NOTE(vu.m.nguyen): not thread-safe, but only for test. + // This is to sync the counter b/w active and standby. + if (lgs_conf.logMaxPendingWriteReq == 0) test_counter = 1; +#endif + } else if (strcmp(name_str, LOG_FILE_SYS_CONFIG) == 0) { lgs_conf.logFileSysConfig = (SaUint32T)strtoul(value_str, nullptr, 0); } else if (strcmp(name_str, LOG_RECORD_DESTINATION_CONFIGURATION) == 0) { @@ -948,6 +972,19 @@ static int verify_all_init() { rc = -1; } + if (!Cache::instance()->VerifyResilienceTime(lgs_conf.logResilienceTimeout)) { + lgs_conf.logResilienceTimeout = lgs_conf_def.logResilienceTimeout; + lgs_conf.logResilienceTimeout_cnfflag = LGS_CNF_DEF; + rc = -1; + } + + if (!Cache::instance()->VerifyMaxQueueSize( + lgs_conf.logMaxPendingWriteReq)) { + lgs_conf.logMaxPendingWriteReq = lgs_conf_def.logMaxPendingWriteReq; + lgs_conf.logMaxPendingWriteReq_cnfflag = LGS_CNF_DEF; + rc = -1; + } + if (lgs_cfg_verify_log_filesys_config(lgs_conf.logFileSysConfig) == -1) { lgs_conf.logFileSysConfig = lgs_conf_def.logFileSysConfig; lgs_conf.logFileSysConfig_cnfflag = LGS_CNF_DEF; @@ -1090,6 +1127,14 @@ static void read_logsv_config_obj_2() { lgs_conf.logFileIoTimeout = *reinterpret_cast<SaUint32T *>(value); lgs_conf.logFileIoTimeout_cnfflag = LGS_CNF_OBJ; TRACE("Conf obj; logFileIoTimeout: %u", lgs_conf.logFileIoTimeout); + } else if (!strcmp(attribute->attrName, LOG_RESILIENCE_TIMEOUT)) { + lgs_conf.logResilienceTimeout = *reinterpret_cast<SaUint32T *>(value); + lgs_conf.logResilienceTimeout_cnfflag = LGS_CNF_OBJ; + TRACE("Conf obj; logResilienceTimeout: %u", lgs_conf.logFileIoTimeout); + } else if (!strcmp(attribute->attrName, LOG_MAX_PENDING_WRITE_REQ)) { + lgs_conf.logMaxPendingWriteReq = *reinterpret_cast<SaUint32T *>(value); + lgs_conf.logMaxPendingWriteReq_cnfflag = LGS_CNF_OBJ; + TRACE("Conf obj; logMaxPendingWriteRequests: %u", lgs_conf.logFileIoTimeout); } else if (!strcmp(attribute->attrName, LOG_FILE_SYS_CONFIG)) { lgs_conf.logFileSysConfig = *reinterpret_cast<SaUint32T *>(value); lgs_conf.logFileSysConfig_cnfflag = LGS_CNF_OBJ; @@ -1440,6 +1485,12 @@ const void *lgs_cfg_get(lgs_logconfGet_t param) { case LGS_IMM_LOG_RECORD_DESTINATION_STATUS: value_ptr = &lgs_conf.logRecordDestinationStatus; break; + case LGS_IMM_LOG_RESILIENCE_TIMEOUT: + value_ptr = &lgs_conf.logResilienceTimeout; + break; + case LGS_IMM_LOG_MAX_PENDING_WRITE_REQ: + value_ptr = &lgs_conf.logMaxPendingWriteReq; + break; case LGS_IMM_LOG_NUMBER_OF_PARAMS: case LGS_IMM_LOG_NUMEND: @@ -1734,9 +1785,7 @@ void conf_runtime_obj_handler(SaImmOiHandleT immOiHandle, char *str_val = nullptr; SaUint32T u32_val = 0; SaAisErrorT ais_rc = SA_AIS_OK; - TRACE_ENTER(); - while ((attributeName = attributeNames[i++]) != nullptr) { if (!strcmp(attributeName, LOG_ROOT_DIRECTORY)) { str_val = const_cast<char *>(static_cast<const char *>( @@ -1798,6 +1847,23 @@ void conf_runtime_obj_handler(SaImmOiHandleT immOiHandle, ais_rc = immutil_update_one_rattr(immOiHandle, LGS_CFG_RUNTIME_OBJECT, attributeName, SA_IMM_ATTR_SAUINT32T, &u32_val); + } else if (!strcmp(attributeName, LOG_RESILIENCE_TIMEOUT)) { + u32_val = *static_cast<const SaUint32T *>( + lgs_cfg_get(LGS_IMM_LOG_RESILIENCE_TIMEOUT)); + ais_rc = immutil_update_one_rattr(immOiHandle, LGS_CFG_RUNTIME_OBJECT, + attributeName, SA_IMM_ATTR_SAUINT32T, + &u32_val); + } else if (!strcmp(attributeName, LOG_MAX_PENDING_WRITE_REQ)) { + u32_val = *static_cast<const SaUint32T *>( + lgs_cfg_get(LGS_IMM_LOG_MAX_PENDING_WRITE_REQ)); + ais_rc = immutil_update_one_rattr(immOiHandle, LGS_CFG_RUNTIME_OBJECT, + attributeName, SA_IMM_ATTR_SAUINT32T, + &u32_val); + } else if (!strcmp(attributeName, LOG_CURRENT_PENDING_WRITE_REQ)) { + u32_val = Cache::instance()->size(); + ais_rc = immutil_update_one_rattr(immOiHandle, LGS_CFG_RUNTIME_OBJECT, + attributeName, SA_IMM_ATTR_SAUINT32T, + &u32_val); } else if (!strcmp(attributeName, LOG_FILE_SYS_CONFIG)) { u32_val = *static_cast<const SaUint32T *>( lgs_cfg_get(LGS_IMM_LOG_FILE_SYS_CONFIG)); @@ -1872,6 +1938,10 @@ void lgs_trace_config() { cnfflag_str(lgs_conf.logFileIoTimeout_cnfflag)); TRACE("logFileSysConfig\t\t %u,\t %s", lgs_conf.logFileSysConfig, cnfflag_str(lgs_conf.logFileSysConfig_cnfflag)); + TRACE("logResilienceTimeout\t\t %u,\t %s", lgs_conf.logResilienceTimeout, + cnfflag_str(lgs_conf.logResilienceTimeout_cnfflag)); + TRACE("logMaxPendingWriteRequests\t\t %u,\t %s", lgs_conf.logMaxPendingWriteReq, + cnfflag_str(lgs_conf.logMaxPendingWriteReq_cnfflag)); // Multivalue: for (auto &conf_str : lgs_conf.logRecordDestinationConfiguration) { diff --git a/src/log/logd/lgs_config.h b/src/log/logd/lgs_config.h index 3f1b05e51..a6f88b3b1 100644 --- a/src/log/logd/lgs_config.h +++ b/src/log/logd/lgs_config.h @@ -65,6 +65,9 @@ #define LOG_FILE_SYS_CONFIG "logFileSysConfig" #define LOG_RECORD_DESTINATION_CONFIGURATION "logRecordDestinationConfiguration" #define LOG_RECORD_DESTINATION_STATUS "logRecordDestinationStatus" +#define LOG_RESILIENCE_TIMEOUT "logResilienceTimeout" +#define LOG_MAX_PENDING_WRITE_REQ "logMaxPendingWriteRequests" +#define LOG_CURRENT_PENDING_WRITE_REQ "logCurrentPendingWriteRequests" typedef enum { LGS_IMM_LOG_ROOT_DIRECTORY, @@ -80,7 +83,8 @@ typedef enum { LGS_IMM_LOG_FILE_SYS_CONFIG, LGS_IMM_LOG_RECORD_DESTINATION_CONFIGURATION, LGS_IMM_LOG_RECORD_DESTINATION_STATUS, - + LGS_IMM_LOG_RESILIENCE_TIMEOUT, + LGS_IMM_LOG_MAX_PENDING_WRITE_REQ, LGS_IMM_LOG_NUMBER_OF_PARAMS, LGS_IMM_LOG_OPENSAFLOGCONFIG_CLASS_EXIST, @@ -114,6 +118,10 @@ static inline lgs_logconfGet_t param_name_to_id(const std::string ¶m_name) { return LGS_IMM_LOG_RECORD_DESTINATION_CONFIGURATION; } else if (param_name == LOG_RECORD_DESTINATION_STATUS) { return LGS_IMM_LOG_RECORD_DESTINATION_STATUS; + } else if (param_name == LOG_MAX_PENDING_WRITE_REQ) { + return LGS_IMM_LOG_MAX_PENDING_WRITE_REQ; + } else if (param_name == LOG_RESILIENCE_TIMEOUT) { + return LGS_IMM_LOG_RESILIENCE_TIMEOUT; } else { return LGS_IMM_LOG_NUMEND; // Error } diff --git a/src/log/logd/lgs_evt.cc b/src/log/logd/lgs_evt.cc index 35d939a4b..7501a282b 100644 --- a/src/log/logd/lgs_evt.cc +++ b/src/log/logd/lgs_evt.cc @@ -32,6 +32,8 @@ #include "log/logd/lgs_clm.h" #include "log/logd/lgs_dest.h" #include "log/logd/lgs_oi_admin.h" +#include "log/logd/lgs_mbcsv.h" +#include "log/logd/lgs_cache.h" void *client_db = nullptr; /* used for C++ STL map */ @@ -1284,17 +1286,11 @@ static uint32_t proc_write_log_async_msg(lgs_cb_t *cb, lgsv_lgs_evt_t *evt) { lgsv_write_log_async_req_t *param = &(evt->info.msg.info.api_info.param).write_log_async; log_stream_t *stream = nullptr; - SaAisErrorT error = SA_AIS_OK; SaStringT logOutputString = nullptr; SaUint32T buf_size; - int n, rc = 0; - lgsv_ckpt_msg_v1_t ckpt_v1; - lgsv_ckpt_msg_v2_t ckpt_v2; - void *ckpt_ptr; + int n = 0; uint32_t max_logrecsize = 0; char node_name[_POSIX_HOST_NAME_MAX]; - RecordData data; - timespec time; memset(node_name, 0, _POSIX_HOST_NAME_MAX); strncpy(node_name, evt->node_name, _POSIX_HOST_NAME_MAX); @@ -1305,20 +1301,20 @@ static uint32_t proc_write_log_async_msg(lgs_cb_t *cb, lgsv_lgs_evt_t *evt) { // Client should try again when role changes is in transition. if (cb->is_quiesced_set) { TRACE("Log service is in quiesced state"); - error = SA_AIS_ERR_TRY_AGAIN; - goto done; + AckToWriteAsync(param, evt->fr_dest, SA_AIS_ERR_TRY_AGAIN); + return NCSCC_RC_SUCCESS; } if (lgs_client_get_by_id(param->client_id) == nullptr) { TRACE("Bad client ID: %u", param->client_id); - error = SA_AIS_ERR_BAD_HANDLE; - goto done; + AckToWriteAsync(param, evt->fr_dest, SA_AIS_ERR_BAD_HANDLE); + return NCSCC_RC_SUCCESS; } if ((stream = log_stream_get_by_id(param->lstr_id)) == nullptr) { TRACE("Bad stream ID: %u", param->lstr_id); - error = SA_AIS_ERR_BAD_HANDLE; - goto done; + AckToWriteAsync(param, evt->fr_dest, SA_AIS_ERR_BAD_HANDLE); + return NCSCC_RC_SUCCESS; } /* Apply filtering only to system and application streams */ @@ -1326,7 +1322,8 @@ static uint32_t proc_write_log_async_msg(lgs_cb_t *cb, lgsv_lgs_evt_t *evt) { ((stream->severityFilter & (1 << param->logRecord->logHeader.genericHdr.logSeverity)) == 0)) { stream->filtered++; - goto done; + AckToWriteAsync(param, evt->fr_dest, SA_AIS_OK); + return NCSCC_RC_SUCCESS; } /* @@ -1342,127 +1339,23 @@ static uint32_t proc_write_log_async_msg(lgs_cb_t *cb, lgsv_lgs_evt_t *evt) { calloc(1, buf_size + 1)); /* Make room for a '\0' termination */ if (logOutputString == nullptr) { LOG_ER("Could not allocate %d bytes", stream->fixedLogRecordSize + 1); - error = SA_AIS_ERR_NO_MEMORY; - goto done; + AckToWriteAsync(param, evt->fr_dest, SA_AIS_ERR_NO_MEMORY); + return NCSCC_RC_SUCCESS; } if ((n = lgs_format_log_record( param->logRecord, stream->logFileFormat, stream->maxLogFileSize, stream->fixedLogRecordSize, buf_size, logOutputString, ++stream->logRecordId, node_name)) == 0) { - error = SA_AIS_ERR_INVALID_PARAM; - goto done; + AckToWriteAsync(param, evt->fr_dest, SA_AIS_ERR_INVALID_PARAM); + return NCSCC_RC_SUCCESS; } - rc = log_stream_write_h(stream, logOutputString, n); - - /* '\0' terminate log record string before check pointing. - * Since the log record always is a string '\0' can be used instead of - * using an extra parameter for buffer size. - */ logOutputString[n] = '\0'; - - /* Always return try again on stream write error */ - if ((rc == -1) || (rc == -2)) { - error = SA_AIS_ERR_TRY_AGAIN; - goto done; - } - - //> - // Has successfully written log record to file. - // Now, send to destination if any destination name set. - //< - - // Streaming not support on alarm/notif streams. - if ((stream->name == SA_LOG_STREAM_ALARM) || - (stream->name == SA_LOG_STREAM_NOTIFICATION)) { - goto checkpoint; - } - - // Packing Record data that carry necessary information - // to form RFC5424 syslog msg, then send to destination name(s). - data.name = stream->name.c_str(); - data.logrec = logOutputString; - data.hostname = node_name; - data.networkname = lgs_get_networkname().c_str(); - data.appname = osaf_extended_name_borrow( - param->logRecord->logHeader.genericHdr.logSvcUsrName); - data.msgid = stream->rfc5424MsgId.c_str(); - data.isRtStream = stream->isRtStream; - data.recordId = stream->logRecordId; - data.sev = param->logRecord->logHeader.genericHdr.logSeverity; - time.tv_sec = (param->logRecord->logTimeStamp / (SaTimeT)SA_TIME_ONE_SECOND); - time.tv_nsec = (param->logRecord->logTimeStamp % (SaTimeT)SA_TIME_ONE_SECOND); - data.time = time; - - WriteToDestination(data, stream->dest_names); - -checkpoint: - /* TODO: send fail back if ack is wanted, Fix counter for application stream!! - */ - if (cb->ha_state == SA_AMF_HA_ACTIVE) { - if (lgs_is_peer_v2()) { - memset(&ckpt_v2, 0, sizeof(ckpt_v2)); - ckpt_v2.header.ckpt_rec_type = LGS_CKPT_LOG_WRITE; - ckpt_v2.header.num_ckpt_records = 1; - ckpt_v2.header.data_len = 1; - ckpt_v2.ckpt_rec.write_log.recordId = stream->logRecordId; - ckpt_v2.ckpt_rec.write_log.streamId = stream->streamId; - ckpt_v2.ckpt_rec.write_log.curFileSize = stream->curFileSize; - ckpt_v2.ckpt_rec.write_log.logFileCurrent = - const_cast<char *>(stream->logFileCurrent.c_str()); - ckpt_v2.ckpt_rec.write_log.logRecord = logOutputString; - ckpt_v2.ckpt_rec.write_log.c_file_close_time_stamp = - stream->act_last_close_timestamp; - ckpt_ptr = &ckpt_v2; - } else { - memset(&ckpt_v1, 0, sizeof(ckpt_v1)); - ckpt_v1.header.ckpt_rec_type = LGS_CKPT_LOG_WRITE; - ckpt_v1.header.num_ckpt_records = 1; - ckpt_v1.header.data_len = 1; - ckpt_v1.ckpt_rec.write_log.recordId = stream->logRecordId; - ckpt_v1.ckpt_rec.write_log.streamId = stream->streamId; - ckpt_v1.ckpt_rec.write_log.curFileSize = stream->curFileSize; - ckpt_v1.ckpt_rec.write_log.logFileCurrent = - const_cast<char *>(stream->logFileCurrent.c_str()); - ckpt_ptr = &ckpt_v1; - } - - (void)lgs_ckpt_send_async(cb, ckpt_ptr, NCS_MBCSV_ACT_ADD); - } - - /* Save stb_recordId. Used by standby if configured for split file system. - * It's save here in order to contain a correct value if this node becomes - * standby. - */ - stream->stb_logRecordId = stream->logRecordId; - -done: - /* - Since the logOutputString is referred by the log handler thread, in timeout - case, the log API thread might be still using the log record memory. - - To make sure there is no corruption of memory usage in case of time-out (rc - = -2), We leave the log record memory freed to the log handler thread.. - - It is never a good idea to allocate and free memory in different places. - But consider it as a trade-off to have a better performance of LOGsv - as time-out occurs very rarely. - - Other cases, the allocator frees it. - */ - if ((rc != -2) && (logOutputString != nullptr)) { - free(logOutputString); - logOutputString = nullptr; - } - - if (param->ack_flags == SA_LOG_RECORD_WRITE_ACK) - lgs_send_write_log_ack(param->client_id, param->invocation, error, - evt->fr_dest); - - lgs_free_write_log(param); - - TRACE_LEAVE2("write status %s", saf_error(error)); + auto info = std::make_shared<Cache::WriteAsyncInfo>(param, + evt->fr_dest, node_name); + auto data = std::make_shared<Cache::Data>(info, logOutputString, n); + Cache::instance()->Write(data); return NCSCC_RC_SUCCESS; } @@ -1572,3 +1465,19 @@ void lgs_process_mbx(SYSF_MBX *mbx) { lgs_evt_destroy(msg); } } + +//> +// Below code are added in the scope of improving the resilience of log server +//< + +void AckToWriteAsync(WriteAsyncParam* req, MDS_DEST dest, + SaAisErrorT code) { + if (req->ack_flags != SA_LOG_RECORD_WRITE_ACK) { + lgs_free_write_log(req); + return; + } + lgs_send_write_log_ack(req->client_id, req->invocation, code, dest); + lgs_free_write_log(req); +} + +bool is_active() { return lgs_cb->ha_state == SA_AMF_HA_ACTIVE; } diff --git a/src/log/logd/lgs_evt.h b/src/log/logd/lgs_evt.h index ff912cbeb..a4b140eee 100644 --- a/src/log/logd/lgs_evt.h +++ b/src/log/logd/lgs_evt.h @@ -91,4 +91,14 @@ SaAisErrorT create_new_app_stream(lgsv_stream_open_req_t *open_sync_param, log_stream_t **o_stream); uint32_t lgs_client_map_init(); + +// Following changes are added to improve the resilience of log server +// Most code are put inside these separated files lgs_cache.{h,cc}. +using WriteAsyncParam = lgsv_write_log_async_req_t; +bool is_active(); +void AckToWriteAsync(WriteAsyncParam* req, MDS_DEST dest, SaAisErrorT code); +bool Streaming(const WriteAsyncParam* req, const char* from_node, + const char* record); + + #endif // LOG_LOGD_LGS_EVT_H_ diff --git a/src/log/logd/lgs_file.cc b/src/log/logd/lgs_file.cc index 2b216e849..b7b34228f 100644 --- a/src/log/logd/lgs_file.cc +++ b/src/log/logd/lgs_file.cc @@ -17,6 +17,7 @@ #include "log/logd/lgs_file.h" +#include <atomic> #include <stdlib.h> #include <stdio.h> #include <stdbool.h> @@ -35,6 +36,10 @@ #include "log/logd/lgs_config.h" #include "log/logd/lgs_filehdl.h" +// This global variable shows if the I/O thread is ready +// or it is being stuck due to underlying file system status. +std::atomic<bool> is_filehdl_thread_ready{true}; + pthread_mutex_t lgs_ftcom_mutex; /* For locking communication */ static pthread_cond_t request_cv; /* File thread waiting for request */ static pthread_cond_t answer_cv; /* API waiting for answer (timed) */ @@ -132,6 +137,7 @@ static void *file_hndl_thread(void *noparam) { * file I/O functions. Mutex is locked when _hdl function returns. */ + is_filehdl_thread_ready = false; /* Invoke requested handler function */ switch (lgs_com_data.request_code) { case LGSF_FILEOPEN: @@ -208,7 +214,7 @@ static void *file_hndl_thread(void *noparam) { */ lgs_com_data.request_f = false; /* Prepare to take a new request */ lgs_com_data.request_code = LGSF_NOREQ; - + is_filehdl_thread_ready = true; /* The following cannot be done if the API has timed out */ if (lgs_com_data.timeout_f == false) { lgs_com_data.answer_f = true; diff --git a/src/log/logd/lgs_filehdl.cc b/src/log/logd/lgs_filehdl.cc index e683f8b68..0d7fb2b74 100644 --- a/src/log/logd/lgs_filehdl.cc +++ b/src/log/logd/lgs_filehdl.cc @@ -31,8 +31,15 @@ #include "base/osaf_time.h" #include "log/logd/lgs.h" +#ifdef SIMULATE_NFS_UNRESPONSE +#include "log/logd/lgs_cache.h" +#endif + extern pthread_mutex_t lgs_ftcom_mutex; /* For locking communication */ +#ifdef SIMULATE_NFS_UNRESPONSE +uint32_t test_counter = 1; +#endif /***************************************************************************** * File operation handlers: * This is the part of a file system handling function that shall execute in @@ -237,18 +244,35 @@ int write_log_record_hdl(void *indata, void *outdata, size_t max_outsize, off_t file_length = 0; wlrh_t *params_in = static_cast<wlrh_t *>(indata); /* Get log record pointed by lgs_rec pointer */ - char *logrecord = - const_cast<char *>(static_cast<const char *>(params_in->lgs_rec)); + const char *logrecord = static_cast<const char *>(params_in->lgs_rec); int *errno_out_p = static_cast<int *>(outdata); + + // Store the data to a tmp storage that is only available within this function + // scope. Doing this to avoid race on `params_in->lgs_rec` b/w filehld thread + // and the main thread; and with this, the main thread will have total right + // to free the allocated memory for `lgs_rec` whenever it wants. + size_t data_size = params_in->record_size; + char data[data_size]; + memcpy(data, logrecord, data_size); + *errno_out_p = 0; TRACE_ENTER(); osaf_mutex_unlock_ordie(&lgs_ftcom_mutex); /* UNLOCK Critical section */ +#ifdef SIMULATE_NFS_UNRESPONSE + test_counter++; + if (Cache::instance()->Capacity() == 0) { + // disable the feature, so reset test counter + test_counter = 1; + } + + if (test_counter % 3 == 0) sleep(16); +#endif + retry: - rc = write(params_in->fd, &logrecord[bytes_written], - params_in->record_size - bytes_written); + rc = write(params_in->fd, &data[bytes_written], data_size - bytes_written); if (rc == -1) { if (errno == EINTR) goto retry; @@ -259,7 +283,7 @@ retry: } else { /* Handle partial writes */ bytes_written += rc; - if (bytes_written < params_in->record_size) goto retry; + if (bytes_written < data_size) goto retry; } osaf_mutex_lock_ordie(&lgs_ftcom_mutex); /* LOCK after critical section */ @@ -286,30 +310,6 @@ retry: } done: - /* - Log record memory is allocated by the caller and this memory is - used or referred by main thread as well as log handler thread. - - In most cases, the caller thread is blocked until the log handler thread - finishs the request (e.g: finish writing log record to file). - Therefore, once the caller thread is unblocked, it is safe to free - the log record memory as the log handler thread no longer uses it. - - But in time-out case, it is unsure when the log handler thread - use the log record memory. - - To make sure there is no corruption of memory usage in case of time-out, - We leave the log record memory freed at the end of this function. - - It is never a good idea to allocate and free memory in different places. - But consider it as a trade-off to have a better performance of LOGsv - as time-out occurs very rarely. - */ - if ((*timeout_f == true) && (logrecord != nullptr)) { - free(logrecord); - logrecord = nullptr; - } - TRACE_LEAVE2("rc = %d", rc); return rc; } diff --git a/src/log/logd/lgs_imm.cc b/src/log/logd/lgs_imm.cc index 578b86c6f..24318bf90 100644 --- a/src/log/logd/lgs_imm.cc +++ b/src/log/logd/lgs_imm.cc @@ -49,13 +49,14 @@ #include "log/logd/lgs_config.h" #include "log/logd/lgs_dest.h" #include "log/logd/lgs_oi_admin.h" -#include "base/saf_error.h" +#include "log/logd/lgs_cache.h" -#include "lgs_mbcsv_v1.h" -#include "lgs_mbcsv_v2.h" -#include "lgs_mbcsv_v3.h" -#include "lgs_mbcsv_v5.h" -#include "lgs_mbcsv_v6.h" +#include "log/logd/lgs_mbcsv_v1.h" +#include "log/logd/lgs_mbcsv_v2.h" +#include "log/logd/lgs_mbcsv_v3.h" +#include "log/logd/lgs_mbcsv_v5.h" +#include "log/logd/lgs_mbcsv_v6.h" +#include "base/saf_error.h" /* TYPE DEFINITIONS * ---------------- @@ -770,6 +771,24 @@ static SaAisErrorT config_ccb_completed_modify( goto done; } TRACE("logFileIoTimeout: %d value is accepted", logFileIoTimeout); + } else if (!strcmp(attribute->attrName, LOG_RESILIENCE_TIMEOUT)) { + SaUint32T timeout = *((SaUint32T *)value); + if (!Cache::instance()->VerifyResilienceTime(timeout)) { + report_oi_error(immOiHandle, opdata->ccbId, "%s value is NOT accepted", + attribute->attrName); + ais_rc = SA_AIS_ERR_INVALID_PARAM; + goto done; + } + TRACE("logResilienceTimeout: %u value is accepted", timeout); + } else if (!strcmp(attribute->attrName, LOG_MAX_PENDING_WRITE_REQ)) { + SaUint32T max = *((SaUint32T *)value); + if (!Cache::instance()->VerifyMaxQueueSize(max)) { + report_oi_error(immOiHandle, opdata->ccbId, "%s value is NOT accepted", + attribute->attrName); + ais_rc = SA_AIS_ERR_INVALID_PARAM; + goto done; + } + TRACE("logMaxPendingWriteRequests: %u value is accepted", max); } else if (!strcmp(attribute->attrName, LOG_FILE_SYS_CONFIG)) { report_oi_error(immOiHandle, opdata->ccbId, "%s cannot be changed", attribute->attrName); @@ -2081,6 +2100,15 @@ static void config_ccb_apply_modify(const CcbUtilOperationData_t *opdata) { uint32_val = *(SaUint32T *)value; snprintf(uint32_str, 20, "%u", uint32_val); lgs_cfgupd_list_create(LOG_FILE_IO_TIMEOUT, uint32_str, &config_data); + } else if (!strcmp(attribute->attrName, LOG_RESILIENCE_TIMEOUT)) { + uint32_val = *(SaUint32T *)value; + snprintf(uint32_str, 20, "%u", uint32_val); + lgs_cfgupd_list_create(LOG_RESILIENCE_TIMEOUT, uint32_str, &config_data); + } else if (!strcmp(attribute->attrName, LOG_MAX_PENDING_WRITE_REQ)) { + uint32_val = *(SaUint32T *)value; + snprintf(uint32_str, 20, "%u", uint32_val); + lgs_cfgupd_list_create(LOG_MAX_PENDING_WRITE_REQ, uint32_str, + &config_data); } else if (!strcmp(attribute->attrName, LOG_RECORD_DESTINATION_CONFIGURATION)) { // Note: Multi value attribute diff --git a/src/log/logd/lgs_main.cc b/src/log/logd/lgs_main.cc index 9767fe00d..b8ed96ec4 100644 --- a/src/log/logd/lgs_main.cc +++ b/src/log/logd/lgs_main.cc @@ -45,7 +45,7 @@ #include "log/logd/lgs_amf.h" #include "log/logd/lgs_oi_admin.h" #include "log/logd/lgs_imm.h" - +#include "log/logd/lgs_cache.h" /* ======================================================================== * DEFINITIONS @@ -485,6 +485,9 @@ int main(int argc, char *argv[]) { * "lost" streams is no longer possible. */ const time_t CLEAN_TIMEOUT = 600; /* 10 min */ + struct timespec last = base::ReadMonotonicClock(); + const int kMaxEvent = 50; + int num_events = 0; TRACE_ENTER(); @@ -515,7 +518,6 @@ int main(int argc, char *argv[]) { fds[FD_IMM].events = POLLIN; lgs_cb->clmSelectionObject = lgs_cb->clm_init_sel_obj.rmv_obj; - while (1) { if (cltimer_fd < 0 && log_rtobj_list_no() != 0) { /* Needed only if any "lost" objects are found @@ -542,7 +544,9 @@ int main(int argc, char *argv[]) { nfds = FD_IMM; } - int ret = poll(fds, nfds, -1); + + int timeout = Cache::instance()->GeneratePollTimeout(last); + int ret = poll(fds, nfds, timeout); if (ret == -1) { if (errno == EINTR) continue; @@ -551,6 +555,13 @@ int main(int argc, char *argv[]) { break; } + if (ret == 0) { + Cache::instance()->PeriodicCheck(); + last = base::ReadMonotonicClock(); + num_events = 0; + continue; + } + if (fds[FD_TERM].revents & POLLIN) { daemon_exit(); } @@ -632,6 +643,13 @@ int main(int argc, char *argv[]) { break; } } + + num_events++; + if (num_events >= kMaxEvent) { + Cache::instance()->PeriodicCheck(); + num_events = 0; + last = base::ReadMonotonicClock(); + } } done: diff --git a/src/log/logd/lgs_mbcsv.cc b/src/log/logd/lgs_mbcsv.cc index 7e4abd6dd..f83d9ec20 100644 --- a/src/log/logd/lgs_mbcsv.cc +++ b/src/log/logd/lgs_mbcsv.cc @@ -28,8 +28,9 @@ #include "log/logd/lgs_mbcsv_v3.h" #include "log/logd/lgs_mbcsv_v2.h" #include "log/logd/lgs_mbcsv_v1.h" +#include "log/logd/lgs_mbcsv_cache.h" #include "log/logd/lgs_recov.h" - +#include "log/logd/lgs_cache.h" /* LGS_CKPT_DATA_HEADER 4 4 4 2 @@ -76,7 +77,6 @@ static uint32_t ckpt_proc_close_stream(lgs_cb_t *cb, void *data); static uint32_t ckpt_proc_cfg_stream(lgs_cb_t *cb, void *data); static void enc_ckpt_header(uint8_t *pdata, lgsv_ckpt_header_t header); -static uint32_t dec_ckpt_header(NCS_UBAID *uba, lgsv_ckpt_header_t *header); static uint32_t ckpt_decode_cbk_handler(NCS_MBCSV_CB_ARG *cbk_arg); static uint32_t mbcsv_callback( NCS_MBCSV_CB_ARG *arg); /* Common Callback interface to mbcsv */ @@ -96,16 +96,24 @@ static uint32_t ckpt_err_ind_cbk_handler(NCS_MBCSV_CB_ARG *arg); static uint32_t edu_enc_reg_list(lgs_cb_t *cb, NCS_UBAID *uba); static uint32_t edu_enc_streams(lgs_cb_t *cb, NCS_UBAID *uba); -static uint32_t process_ckpt_data(lgs_cb_t *cb, void *data); typedef uint32_t (*LGS_CKPT_HDLR)(lgs_cb_t *cb, void *data); static LGS_CKPT_HDLR ckpt_data_handler[] = { - ckpt_proc_initialize_client, ckpt_proc_finalize_client, - ckpt_proc_agent_down, ckpt_proc_log_write, - ckpt_proc_open_stream, ckpt_proc_close_stream, - ckpt_proc_cfg_stream, ckpt_proc_lgs_cfg_v2, - ckpt_proc_lgs_cfg_v3, ckpt_proc_lgs_cfg_v5}; + ckpt_proc_initialize_client, + ckpt_proc_finalize_client, + ckpt_proc_agent_down, + ckpt_proc_log_write, + ckpt_proc_open_stream, + ckpt_proc_close_stream, + ckpt_proc_cfg_stream, + ckpt_proc_lgs_cfg_v2, + ckpt_proc_lgs_cfg_v3, + ckpt_proc_lgs_cfg_v5, + ckpt_proc_push_async, + ckpt_proc_pop_async, + ckpt_proc_pop_write_async +}; /**************************************************************************** * Name : edp_ed_open_stream_rec @@ -471,6 +479,18 @@ bool lgs_is_peer_v7() { } } +/** + * Check if peer is version 8 (or later) + * @return bool + */ +bool lgs_is_peer_v8() { + if (lgs_cb->mbcsv_peer_version >= LGS_MBCSV_VERSION_8) { + return true; + } else { + return false; + } +} + /** * Check if configured for split file system. * If other node is version 1 split file system mode is not applicable. @@ -657,6 +677,13 @@ static uint32_t ckpt_enc_cold_sync_data(lgs_cb_t *lgs_cb, TRACE(" edu_enc_streams FAILED"); return NCSCC_RC_FAILURE; } + + rc = Cache::instance()->EncodeColdSync(&cbk_arg->info.encode.io_uba); + if (rc != NCSCC_RC_SUCCESS) { + LOG_NO("ColdSync of cached data FAILED."); + return NCSCC_RC_FAILURE; + } + /* Encode the Async Update Count at standby */ /* This will have the count of async updates that have been sent, @@ -881,6 +908,7 @@ static uint32_t edu_enc_reg_list(lgs_cb_t *cb, NCS_UBAID *uba) { static uint32_t ckpt_encode_async_update(lgs_cb_t *lgs_cb, EDU_HDL edu_hdl, NCS_MBCSV_CB_ARG *cbk_arg) { + lgsv_ckpt_msg_v8_t *data_v8 = NULL; lgsv_ckpt_msg_v6_t *data_v6 = NULL; lgsv_ckpt_msg_v5_t *data_v5 = NULL; lgsv_ckpt_msg_v3_t *data_v3 = NULL; @@ -893,7 +921,12 @@ static uint32_t ckpt_encode_async_update(lgs_cb_t *lgs_cb, EDU_HDL edu_hdl, TRACE_ENTER(); /* Set reo_hdl from callback arg to ckpt_rec */ - if (lgs_is_peer_v6()) { + if (lgs_is_peer_v8()) { + data_v8 = reinterpret_cast<lgsv_ckpt_msg_v8_t *>( + static_cast<long>(cbk_arg->info.encode.io_reo_hdl)); + vdata = data_v8; + edp_function = edp_ed_ckpt_msg_v8; + } else if (lgs_is_peer_v6()) { data_v6 = reinterpret_cast<lgsv_ckpt_msg_v6_t *>( static_cast<long>(cbk_arg->info.encode.io_reo_hdl)); vdata = data_v6; @@ -1047,7 +1080,7 @@ static uint32_t ckpt_decode_cbk_handler(NCS_MBCSV_CB_ARG *cbk_arg) { * @param edp_function[in] * @return */ -static uint32_t ckpt_decode_log_struct( +uint32_t ckpt_decode_log_struct( lgs_cb_t *cb, /* lgs cb data */ NCS_MBCSV_CB_ARG *cbk_arg, /* Mbcsv callback data */ void *ckpt_msg, /* Checkpointed message */ @@ -1055,7 +1088,7 @@ static uint32_t ckpt_decode_log_struct( EDU_PROG_HANDLER edp_function) /* EDP function for decoding */ { EDU_ERR ederror; - + TRACE_ENTER(); uint32_t rc = m_NCS_EDU_EXEC(&cb->edu_hdl, edp_function, &cbk_arg->info.decode.i_uba, EDP_OP_TYPE_DEC, &struct_ptr, &ederror); @@ -1071,13 +1104,19 @@ static uint32_t ckpt_decode_log_struct( static uint32_t ckpt_decode_log_write(lgs_cb_t *cb, void *ckpt_msg, NCS_MBCSV_CB_ARG *cbk_arg) { + TRACE_ENTER(); uint32_t rc = NCSCC_RC_SUCCESS; void *write_log; EDU_PROG_HANDLER edp_function; const int sleep_delay_ms = 10; const int max_waiting_time_ms = 100; - if (lgs_is_peer_v2()) { + if (lgs_is_peer_v8()) { + lgsv_ckpt_msg_v8_t *ckpt_msg_v8 = + static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg); + write_log = &ckpt_msg_v8->ckpt_rec.write_log; + edp_function = edp_ed_write_rec_v2; + } else if (lgs_is_peer_v2()) { lgsv_ckpt_msg_v2_t *ckpt_msg_v2 = static_cast<lgsv_ckpt_msg_v2_t *>(ckpt_msg); write_log = &ckpt_msg_v2->ckpt_rec.write_log; @@ -1119,7 +1158,12 @@ static uint32_t ckpt_decode_log_close(lgs_cb_t *cb, void *ckpt_msg, void *stream_close; EDU_PROG_HANDLER edp_function; - if (lgs_is_peer_v2()) { + if (lgs_is_peer_v8()) { + lgsv_ckpt_msg_v8_t *ckpt_msg_v8 = + static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg); + stream_close = &ckpt_msg_v8->ckpt_rec.stream_close; + edp_function = edp_ed_close_stream_rec_v2; + } else if (lgs_is_peer_v2()) { lgsv_ckpt_msg_v2_t *ckpt_msg_v2 = static_cast<lgsv_ckpt_msg_v2_t *>(ckpt_msg); stream_close = &ckpt_msg_v2->ckpt_rec.stream_close; @@ -1145,7 +1189,12 @@ static uint32_t ckpt_decode_log_client_finalize(lgs_cb_t *cb, void *ckpt_msg, void *finalize_client; EDU_PROG_HANDLER edp_function; - if (lgs_is_peer_v2()) { + if (lgs_is_peer_v8()) { + lgsv_ckpt_msg_v8_t *ckpt_msg_v8 = + static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg); + finalize_client = &ckpt_msg_v8->ckpt_rec.finalize_client; + edp_function = edp_ed_finalize_rec_v2; + } else if (lgs_is_peer_v2()) { lgsv_ckpt_msg_v2_t *ckpt_msg_v2 = static_cast<lgsv_ckpt_msg_v2_t *>(ckpt_msg); finalize_client = &ckpt_msg_v2->ckpt_rec.finalize_client; @@ -1171,7 +1220,12 @@ static uint32_t ckpt_decode_log_client_down(lgs_cb_t *cb, void *ckpt_msg, void *client_down; EDU_PROG_HANDLER edp_function; - if (lgs_is_peer_v2()) { + if (lgs_is_peer_v8()) { + lgsv_ckpt_msg_v8_t *ckpt_msg_v8 = + static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg); + client_down = &ckpt_msg_v8->ckpt_rec.agent_down; + edp_function = edp_ed_agent_down_rec_v2; + } else if (lgs_is_peer_v2()) { lgsv_ckpt_msg_v2_t *ckpt_msg_v2 = static_cast<lgsv_ckpt_msg_v2_t *>(ckpt_msg); client_down = &ckpt_msg_v2->ckpt_rec.agent_down; @@ -1196,7 +1250,12 @@ static uint32_t ckpt_decode_log_cfg_stream(lgs_cb_t *cb, void *ckpt_msg, void *stream_cfg; EDU_PROG_HANDLER edp_function; - if (lgs_is_peer_v6()) { + if (lgs_is_peer_v8()) { + lgsv_ckpt_msg_v8_t *ckpt_msg_v8 = + static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg); + stream_cfg = &ckpt_msg_v8->ckpt_rec.stream_cfg; + edp_function = edp_ed_cfg_stream_rec_v6; + } else if (lgs_is_peer_v6()) { lgsv_ckpt_msg_v6_t *ckpt_msg_v6 = static_cast<lgsv_ckpt_msg_v6_t *>(ckpt_msg); stream_cfg = &ckpt_msg_v6->ckpt_rec.stream_cfg; @@ -1225,12 +1284,17 @@ static uint32_t ckpt_decode_log_cfg(lgs_cb_t *cb, void *ckpt_msg, uint32_t rc = NCSCC_RC_SUCCESS; void *lgs_cfg = NULL; EDU_PROG_HANDLER edp_function = NULL; + lgsv_ckpt_msg_v8_t *ckpt_msg_v8; lgsv_ckpt_msg_v6_t *ckpt_msg_v6; lgsv_ckpt_msg_v5_t *ckpt_msg_v5; lgsv_ckpt_msg_v3_t *ckpt_msg_v3; lgsv_ckpt_msg_v2_t *ckpt_msg_v2; - if (lgs_is_peer_v6()) { + if (lgs_is_peer_v8()) { + ckpt_msg_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg); + lgs_cfg = &ckpt_msg_v8->ckpt_rec.lgs_cfg; + edp_function = edp_ed_lgs_cfg_rec_v5; + } else if (lgs_is_peer_v6()) { ckpt_msg_v6 = static_cast<lgsv_ckpt_msg_v6_t *>(ckpt_msg); lgs_cfg = &ckpt_msg_v6->ckpt_rec.lgs_cfg; edp_function = edp_ed_lgs_cfg_rec_v5; @@ -1259,6 +1323,7 @@ static uint32_t ckpt_decode_log_cfg(lgs_cb_t *cb, void *ckpt_msg, return rc; } + /* END ckpt_decode_async_update helper functions */ static uint32_t ckpt_decode_async_update(lgs_cb_t *cb, @@ -1274,6 +1339,8 @@ static uint32_t ckpt_decode_async_update(lgs_cb_t *cb, lgsv_ckpt_msg_v5_t *ckpt_msg_v5 = &msg_v5; lgsv_ckpt_msg_v6_t msg_v6; lgsv_ckpt_msg_v6_t *ckpt_msg_v6 = &msg_v6; + lgsv_ckpt_msg_v8_t msg_v8; + lgsv_ckpt_msg_v8_t *ckpt_msg_v8 = &msg_v8; void *ckpt_msg; lgsv_ckpt_header_t hdr, *hdr_ptr = &hdr; @@ -1281,7 +1348,6 @@ static uint32_t ckpt_decode_async_update(lgs_cb_t *cb, EDU_PROG_HANDLER edp_function_reg = NULL; /* Same in all versions */ lgs_ckpt_stream_open_t *stream_open; - TRACE_ENTER(); /* Decode the message header */ @@ -1295,7 +1361,10 @@ static uint32_t ckpt_decode_async_update(lgs_cb_t *cb, TRACE_2("\tckpt_rec_type: %d ", (int)hdr_ptr->ckpt_rec_type); - if (lgs_is_peer_v6()) { + if (lgs_is_peer_v8()) { + ckpt_msg_v8->header = hdr; + ckpt_msg = ckpt_msg_v8; + } else if (lgs_is_peer_v6()) { ckpt_msg_v6->header = hdr; ckpt_msg = ckpt_msg_v6; } else if (lgs_is_peer_v5()) { @@ -1316,7 +1385,10 @@ static uint32_t ckpt_decode_async_update(lgs_cb_t *cb, switch (hdr_ptr->ckpt_rec_type) { case LGS_CKPT_CLIENT_INITIALIZE: TRACE_2("\tINITIALIZE REC: UPDATE"); - if (lgs_is_peer_v6()) { + if (lgs_is_peer_v8()) { + reg_rec = &ckpt_msg_v8->ckpt_rec.initialize_client; + edp_function_reg = edp_ed_reg_rec_v6; + } else if (lgs_is_peer_v6()) { reg_rec = &ckpt_msg_v6->ckpt_rec.initialize_client; edp_function_reg = edp_ed_reg_rec_v6; } else if (lgs_is_peer_v5()) { @@ -1349,7 +1421,9 @@ static uint32_t ckpt_decode_async_update(lgs_cb_t *cb, case LGS_CKPT_OPEN_STREAM: /* 4 */ TRACE_2("\tSTREAM OPEN: UPDATE"); - if (lgs_is_peer_v6()) { + if (lgs_is_peer_v8()) { + stream_open = &ckpt_msg_v8->ckpt_rec.stream_open; + } else if (lgs_is_peer_v6()) { stream_open = &ckpt_msg_v6->ckpt_rec.stream_open; } else if (lgs_is_peer_v5()) { stream_open = &ckpt_msg_v5->ckpt_rec.stream_open; @@ -1406,6 +1480,27 @@ static uint32_t ckpt_decode_async_update(lgs_cb_t *cb, goto done; } break; + case LGS_CKPT_PUSH_ASYNC: + TRACE("LGS_CKPT_PUSH_ASYNC"); + rc = DecodePushAsync(cb, ckpt_msg, cbk_arg); + if (rc != NCSCC_RC_SUCCESS) { + goto done; + } + break; + case LGS_CKPT_POP_ASYNC: + TRACE("LGS_CKPT_POP_ASYNC"); + rc = DecodePopAsync(cb, ckpt_msg, cbk_arg); + if (rc != NCSCC_RC_SUCCESS) { + goto done; + } + break; + case LGS_CKPT_POP_WRITE_ASYNC: + TRACE("LGS_CKPT_POP_WRITE_ASYNC"); + rc = DecodePopAndWriteAsync(cb, ckpt_msg, cbk_arg); + if (rc != NCSCC_RC_SUCCESS) { + goto done; + } + break; default: rc = NCSCC_RC_FAILURE; TRACE("\tFAILED Unknown ckpt record type"); @@ -1446,6 +1541,7 @@ static uint32_t ckpt_decode_cold_sync(lgs_cb_t *cb, NCS_MBCSV_CB_ARG *cbk_arg) { lgsv_ckpt_msg_v1_t msg_v1; lgsv_ckpt_msg_v2_t msg_v2; lgsv_ckpt_msg_v6_t msg_v6; + lgsv_ckpt_msg_v8_t msg_v8; uint32_t num_rec = 0; void *reg_rec = NULL; lgs_ckpt_stream_open_t *stream_rec = NULL; @@ -1467,8 +1563,16 @@ static uint32_t ckpt_decode_cold_sync(lgs_cb_t *cb, NCS_MBCSV_CB_ARG *cbk_arg) { | Header|RegRecords1..n|Header|streamRecords1..n| ------------------------------------------------- */ - - if (lgs_is_peer_v6()) { + if (lgs_is_peer_v8()) { + lgsv_ckpt_msg_v8_t *data_v8 = &msg_v8; + header = &data_v8->header; + initialize_client_rec_ptr = &data_v8->ckpt_rec.initialize_client; + stream_open_rec_ptr = &data_v8->ckpt_rec.stream_open; + vdata = data_v8; + vckpt_rec = &data_v8->ckpt_rec; + ckpt_rec_size = sizeof(data_v8->ckpt_rec); + edp_function_reg = edp_ed_reg_rec_v6; + } else if (lgs_is_peer_v6()) { lgsv_ckpt_msg_v6_t *data_v6 = &msg_v6; header = &data_v6->header; initialize_client_rec_ptr = &data_v6->ckpt_rec.initialize_client; @@ -1571,6 +1675,13 @@ static uint32_t ckpt_decode_cold_sync(lgs_cb_t *cb, NCS_MBCSV_CB_ARG *cbk_arg) { --num_rec; } /*End while, stream records */ + rc = Cache::instance()->DecodeColdSync(&cbk_arg->info.decode.i_uba, header, + vdata, &vckpt_rec, ckpt_rec_size); + if (rc != NCSCC_RC_SUCCESS) { + LOG_NO("DecodeColdSync failed"); + goto done; + } + /* Get the async update count */ ptr = ncs_dec_flatten_space(&cbk_arg->info.decode.i_uba, data_cnt, sizeof(uint32_t)); @@ -1605,7 +1716,7 @@ done: * Notes : None. *****************************************************************************/ -static uint32_t process_ckpt_data(lgs_cb_t *cb, void *data) { +uint32_t process_ckpt_data(lgs_cb_t *cb, void *data) { uint32_t rc = NCSCC_RC_SUCCESS; lgsv_ckpt_msg_type_t lgsv_ckpt_msg_type; lgsv_ckpt_msg_v1_t *data_v1; @@ -1613,13 +1724,17 @@ static uint32_t process_ckpt_data(lgs_cb_t *cb, void *data) { lgsv_ckpt_msg_v3_t *data_v3; lgsv_ckpt_msg_v5_t *data_v5; lgsv_ckpt_msg_v6_t *data_v6; + lgsv_ckpt_msg_v8_t *data_v8; if ((!cb) || (data == NULL)) { TRACE("%s - FAILED: (!cb) || (data == NULL)", __FUNCTION__); return (rc = NCSCC_RC_FAILURE); } - if (lgs_is_peer_v6()) { + if (lgs_is_peer_v8()) { + data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data); + lgsv_ckpt_msg_type = data_v8->header.ckpt_rec_type; + } else if (lgs_is_peer_v6()) { data_v6 = static_cast<lgsv_ckpt_msg_v6_t *>(data); lgsv_ckpt_msg_type = data_v6->header.ckpt_rec_type; } else if (lgs_is_peer_v5()) { @@ -1673,7 +1788,30 @@ static uint32_t ckpt_proc_initialize_client(lgs_cb_t *cb, void *data) { log_client_t *client; TRACE_ENTER(); - if (lgs_is_peer_v6()) { + if (lgs_is_peer_v8()) { + lgs_ckpt_initialize_msg_v6_t *param; + lgsv_ckpt_msg_v8_t *data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data); + param = &data_v8->ckpt_rec.initialize_client; + + TRACE("client ID: %d", param->client_id); + client = lgs_client_get_by_id(param->client_id); + if (client == NULL) { + /* Client does not exist, create new one */ + if ((client = lgs_client_new(param->mds_dest, param->client_id, + param->stream_list)) == NULL) { + /* Do not allow standby to get out of sync */ + lgs_exit("Could not create new client", SA_AMF_COMPONENT_RESTART); + } else { + client->client_ver = param->client_ver; + } + } else { + /* Client with ID already exist, check other attributes */ + if (client->mds_dest != param->mds_dest) { + /* Do not allow standby to get out of sync */ + lgs_exit("Client attributes differ", SA_AMF_COMPONENT_RESTART); + } + } + } else if (lgs_is_peer_v6()) { lgs_ckpt_initialize_msg_v6_t *param; lgsv_ckpt_msg_v6_t *data_v6 = static_cast<lgsv_ckpt_msg_v6_t *>(data); param = &data_v6->ckpt_rec.initialize_client; @@ -1873,6 +2011,70 @@ done: } } +uint32_t WriteOnStandby(log_stream_t* stream, uint64_t timestamp, + char* file_current, char* logRecord) { + +#ifdef SIMULATE_NFS_UNRESPONSE + // NOTE(vu.m.nguyen): not thread-safe, but only for test. + // This is to sync the counter b/w active and standby. + test_counter++; +#endif + + int rc = 0; + /* If configured for split file system log records shall be written also if + * we are standby. + */ + if (lgs_is_split_file_system() && (logRecord != nullptr)) { + size_t rec_len = strlen(logRecord); + stream->act_last_close_timestamp = timestamp; + + /* Check if record id numbering is inconsistent. If so there are + * possible missed log records and a notification shall be inserted + * in log file. + */ + if ((stream->stb_logRecordId + 1) != stream->logRecordId) { + insert_localmsg_in_stream( + stream, const_cast<char *>("Possible loss of log record")); + } + + /* Make a limited number of attempts to write if file IO timed out when + * trying to write the log record. + */ + rc = log_stream_write_h(stream, logRecord, rec_len); + if (rc != 0) { + TRACE("\tError %d when writing log record", rc); + } + + stream->stb_logRecordId = stream->logRecordId; + } /* END lgs_is_split_file_system */ + + /* + Since the logRecord is referred by the log handler thread, in time-out case, + the log API thread might be still using the log record memory. + + To make sure there is no corruption of memory usage in case of time-out (rc + = -2), We leave the log record memory freed to the log handler thread.. + + It is never a good idea to allocate and free memory in different places. + But consider it as a trade-off to have a better performance of LOGsv + as time-out occurs very rarely. + + Other cases, the allocator frees it. + */ + if ((rc != -2) && (logRecord != NULL)) { + lgs_free_edu_mem(logRecord); + logRecord = NULL; + } + + lgs_free_edu_mem(file_current); + + /* + If rc == -2, means something happens in log handler thread + return TIMEOUT error, so that the caller will try again. + */ + return (rc == -2 ? NCSCC_RC_REQ_TIMOUT : NCSCC_RC_SUCCESS); +} + /**************************************************************************** * Name : ckpt_proc_log_write * @@ -1897,11 +2099,19 @@ static uint32_t ckpt_proc_log_write(lgs_cb_t *cb, void *data) { char *logFileCurrent; char *logRecord = NULL; uint64_t c_file_close_time_stamp = 0; - int rc = 0; TRACE_ENTER(); - if (lgs_is_peer_v2()) { + if (lgs_is_peer_v8()) { + lgsv_ckpt_msg_v8_t *data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data); + streamId = data_v8->ckpt_rec.write_log.streamId; + recordId = data_v8->ckpt_rec.write_log.recordId; + curFileSize = data_v8->ckpt_rec.write_log.curFileSize; + logFileCurrent = data_v8->ckpt_rec.write_log.logFileCurrent; + logRecord = data_v8->ckpt_rec.write_log.logRecord; + c_file_close_time_stamp = + data_v8->ckpt_rec.write_log.c_file_close_time_stamp; + } else if (lgs_is_peer_v2()) { lgsv_ckpt_msg_v2_t *data_v2 = static_cast<lgsv_ckpt_msg_v2_t *>(data); streamId = data_v2->ckpt_rec.write_log.streamId; recordId = data_v2->ckpt_rec.write_log.recordId; @@ -1921,67 +2131,17 @@ static uint32_t ckpt_proc_log_write(lgs_cb_t *cb, void *data) { stream = log_stream_get_by_id(streamId); if (stream == NULL) { TRACE("Could not lookup stream: %u", streamId); - goto done; + lgs_free_edu_mem(logRecord); + lgs_free_edu_mem(logFileCurrent); + return NCSCC_RC_SUCCESS; } stream->logRecordId = recordId; stream->curFileSize = curFileSize; stream->logFileCurrent = logFileCurrent; - /* If configured for split file system log records shall be written also if - * we are standby. - */ - if (lgs_is_split_file_system() && (logRecord != nullptr)) { - size_t rec_len = strlen(logRecord); - stream->act_last_close_timestamp = c_file_close_time_stamp; - - /* Check if record id numbering is inconsistent. If so there are - * possible missed log records and a notification shall be inserted - * in log file. - */ - if ((stream->stb_logRecordId + 1) != recordId) { - insert_localmsg_in_stream( - stream, const_cast<char *>("Possible loss of log record")); - } - - /* Make a limited number of attempts to write if file IO timed out when - * trying to write the log record. - */ - rc = log_stream_write_h(stream, logRecord, rec_len); - if (rc != 0) { - TRACE("\tError %d when writing log record", rc); - } - - stream->stb_logRecordId = recordId; - } /* END lgs_is_split_file_system */ - -done: - /* - Since the logRecord is referred by the log handler thread, in time-out case, - the log API thread might be still using the log record memory. - - To make sure there is no corruption of memory usage in case of time-out (rc - = -2), We leave the log record memory freed to the log handler thread.. - - It is never a good idea to allocate and free memory in different places. - But consider it as a trade-off to have a better performance of LOGsv - as time-out occurs very rarely. - - Other cases, the allocator frees it. - */ - if ((rc != -2) && (logRecord != NULL)) { - lgs_free_edu_mem(logRecord); - logRecord = NULL; - } - - lgs_free_edu_mem(logFileCurrent); - - TRACE_LEAVE(); - /* - If rc == -2, means something happens in log handler thread - return TIMEOUT error, so that the caller will try again. - */ - return (rc == -2 ? NCSCC_RC_REQ_TIMOUT : NCSCC_RC_SUCCESS); + return WriteOnStandby(stream, c_file_close_time_stamp, + logFileCurrent, logRecord); } /**************************************************************************** @@ -2007,7 +2167,14 @@ static uint32_t ckpt_proc_close_stream(lgs_cb_t *cb, void *data) { TRACE_ENTER(); - if (lgs_is_peer_v2()) { + if (lgs_is_peer_v8()) { + lgsv_ckpt_msg_v8_t *data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data); + streamId = data_v8->ckpt_rec.stream_close.streamId; + clientId = data_v8->ckpt_rec.stream_close.clientId; + /* Set time for closing. Used for renaming */ + closetime_ptr = reinterpret_cast<time_t *>( + &data_v8->ckpt_rec.stream_close.c_file_close_time_stamp); + } else if (lgs_is_peer_v2()) { lgsv_ckpt_msg_v2_t *data_v2 = static_cast<lgsv_ckpt_msg_v2_t *>(data); streamId = data_v2->ckpt_rec.stream_close.streamId; clientId = data_v2->ckpt_rec.stream_close.clientId; @@ -2063,7 +2230,10 @@ uint32_t ckpt_proc_open_stream(lgs_cb_t *cb, void *data) { TRACE_ENTER(); - if (lgs_is_peer_v6()) { + if (lgs_is_peer_v8()) { + lgsv_ckpt_msg_v8_t *data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data); + param = &data_v8->ckpt_rec.stream_open; + } else if (lgs_is_peer_v6()) { lgsv_ckpt_msg_v6_t *data_v6 = static_cast<lgsv_ckpt_msg_v6_t *>(data); param = &data_v6->ckpt_rec.stream_open; } else if (lgs_is_peer_v2()) { @@ -2218,7 +2388,12 @@ static uint32_t ckpt_proc_finalize_client(lgs_cb_t *cb, void *data) { uint32_t client_id; time_t *closetime_ptr; - if (lgs_is_peer_v2()) { + if (lgs_is_peer_v8()) { + lgsv_ckpt_msg_v8_t *data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data); + lgs_ckpt_finalize_msg_v2_t *param = &data_v8->ckpt_rec.finalize_client; + closetime_ptr = reinterpret_cast<time_t *>(¶m->c_file_close_time_stamp); + client_id = param->client_id; + } else if (lgs_is_peer_v2()) { lgsv_ckpt_msg_v2_t *data_v2 = static_cast<lgsv_ckpt_msg_v2_t *>(data); lgs_ckpt_finalize_msg_v2_t *param = &data_v2->ckpt_rec.finalize_client; closetime_ptr = reinterpret_cast<time_t *>(¶m->c_file_close_time_stamp); @@ -2260,7 +2435,12 @@ uint32_t ckpt_proc_agent_down(lgs_cb_t *cb, void *data) { TRACE_ENTER(); - if (lgs_is_peer_v2()) { + if (lgs_is_peer_v8()) { + lgsv_ckpt_msg_v8_t *data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data); + closetime_ptr = reinterpret_cast<time_t *>( + &data_v8->ckpt_rec.agent_down.c_file_close_time_stamp); + agent_dest = data_v8->ckpt_rec.agent_down.agent_dest; + } else if (lgs_is_peer_v2()) { lgsv_ckpt_msg_v2_t *data_v2 = static_cast<lgsv_ckpt_msg_v2_t *>(data); closetime_ptr = reinterpret_cast<time_t *>( &data_v2->ckpt_rec.agent_down.c_file_close_time_stamp); @@ -2320,7 +2500,22 @@ static uint32_t ckpt_proc_cfg_stream(lgs_cb_t *cb, void *data) { TRACE_ENTER(); - if (lgs_is_peer_v6()) { + if (lgs_is_peer_v8()) { + lgsv_ckpt_msg_v8_t *data_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(data); + name = data_v8->ckpt_rec.stream_cfg.name; + fileName = data_v8->ckpt_rec.stream_cfg.fileName; + pathName = data_v8->ckpt_rec.stream_cfg.pathName; + maxLogFileSize = data_v8->ckpt_rec.stream_cfg.maxLogFileSize; + fixedLogRecordSize = data_v8->ckpt_rec.stream_cfg.fixedLogRecordSize; + logFullAction = data_v8->ckpt_rec.stream_cfg.logFullAction; + logFullHaltThreshold = data_v8->ckpt_rec.stream_cfg.logFullHaltThreshold; + maxFilesRotated = data_v8->ckpt_rec.stream_cfg.maxFilesRotated; + logFileFormat = data_v8->ckpt_rec.stream_cfg.logFileFormat; + severityFilter = data_v8->ckpt_rec.stream_cfg.severityFilter; + logFileCurrent = data_v8->ckpt_rec.stream_cfg.logFileCurrent; + dest_names = data_v8->ckpt_rec.stream_cfg.dest_names; + closetime = data_v8->ckpt_rec.stream_cfg.c_file_close_time_stamp; + } else if (lgs_is_peer_v6()) { lgsv_ckpt_msg_v6_t *data_v6 = static_cast<lgsv_ckpt_msg_v6_t *>(data); name = data_v6->ckpt_rec.stream_cfg.name; fileName = data_v6->ckpt_rec.stream_cfg.fileName; @@ -2495,7 +2690,11 @@ uint32_t lgs_ckpt_send_async(lgs_cb_t *cb, void *ckpt_rec, uint32_t action) { TRACE_ENTER(); - if (lgs_is_peer_v6()) { + if (lgs_is_peer_v8()) { + lgsv_ckpt_msg_v8_t *ckpt_rec_v8 = + static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_rec); + ckpt_rec_type = ckpt_rec_v8->header.ckpt_rec_type; + } else if (lgs_is_peer_v6()) { lgsv_ckpt_msg_v6_t *ckpt_rec_v6 = static_cast<lgsv_ckpt_msg_v6_t *>(ckpt_rec); ckpt_rec_type = ckpt_rec_v6->header.ckpt_rec_type; @@ -2799,7 +2998,10 @@ int32_t ckpt_msg_test_type(NCSCONTEXT arg) { LCL_TEST_JUMP_OFFSET_LGS_CKPT_CLOSE_STREAM, LCL_TEST_JUMP_OFFSET_LGS_CKPT_AGENT_DOWN, LCL_TEST_JUMP_OFFSET_LGS_CKPT_CFG_STREAM, - LCL_TEST_JUMP_OFFSET_LGS_CKPT_LGS_CFG + LCL_TEST_JUMP_OFFSET_LGS_CKPT_LGS_CFG, + LCL_TEST_JUMP_OFFSET_LGS_CKPT_PUSH_ASYNC, + LCL_TEST_JUMP_OFFSET_LGS_CKPT_POP_ASYNC, + LCL_TEST_JUMP_OFFSET_LGS_CKPT_POP_WRITE_ASYNC }; lgsv_ckpt_msg_type_t ckpt_rec_type; @@ -2825,6 +3027,12 @@ int32_t ckpt_msg_test_type(NCSCONTEXT arg) { case LGS_CKPT_LGS_CFG_V3: case LGS_CKPT_LGS_CFG_V5: return LCL_TEST_JUMP_OFFSET_LGS_CKPT_LGS_CFG; + case LGS_CKPT_PUSH_ASYNC: + return LCL_TEST_JUMP_OFFSET_LGS_CKPT_PUSH_ASYNC; + case LGS_CKPT_POP_ASYNC: + return LCL_TEST_JUMP_OFFSET_LGS_CKPT_POP_ASYNC; + case LGS_CKPT_POP_WRITE_ASYNC: + return LCL_TEST_JUMP_OFFSET_LGS_CKPT_POP_WRITE_ASYNC; default: return EDU_EXIT; break; @@ -2867,7 +3075,7 @@ static void enc_ckpt_header(uint8_t *pdata, lgsv_ckpt_header_t header) { * Notes : None. *****************************************************************************/ -static uint32_t dec_ckpt_header(NCS_UBAID *uba, lgsv_ckpt_header_t *header) { +uint32_t dec_ckpt_header(NCS_UBAID *uba, lgsv_ckpt_header_t *header) { uint8_t *p8; uint8_t local_data[256]; @@ -2895,3 +3103,60 @@ static uint32_t dec_ckpt_header(NCS_UBAID *uba, lgsv_ckpt_header_t *header) { return NCSCC_RC_SUCCESS; } /*End lgs_dec_ckpt_header */ + +void lgs_ckpt_log_async(log_stream_t* stream, char* record) { + void *ckpt_ptr = nullptr; + if (lgs_cb->ha_state == SA_AMF_HA_ACTIVE) { + lgsv_ckpt_msg_v1_t ckpt_v1; + lgsv_ckpt_msg_v2_t ckpt_v2; + lgsv_ckpt_msg_v8_t ckpt_v8; + if (lgs_is_peer_v8()) { + memset(&ckpt_v8, 0, sizeof(ckpt_v8)); + ckpt_v8.header.ckpt_rec_type = LGS_CKPT_LOG_WRITE; + ckpt_v8.header.num_ckpt_records = 1; + ckpt_v8.header.data_len = 1; + ckpt_v8.ckpt_rec.write_log.recordId = stream->logRecordId; + ckpt_v8.ckpt_rec.write_log.streamId = stream->streamId; + ckpt_v8.ckpt_rec.write_log.curFileSize = stream->curFileSize; + ckpt_v8.ckpt_rec.write_log.logFileCurrent = + const_cast<char *>(stream->logFileCurrent.c_str()); + ckpt_v8.ckpt_rec.write_log.logRecord = record; + ckpt_v8.ckpt_rec.write_log.c_file_close_time_stamp = + stream->act_last_close_timestamp; + ckpt_ptr = &ckpt_v8; + } else if (lgs_is_peer_v2()) { + memset(&ckpt_v2, 0, sizeof(ckpt_v2)); + ckpt_v2.header.ckpt_rec_type = LGS_CKPT_LOG_WRITE; + ckpt_v2.header.num_ckpt_records = 1; + ckpt_v2.header.data_len = 1; + ckpt_v2.ckpt_rec.write_log.recordId = stream->logRecordId; + ckpt_v2.ckpt_rec.write_log.streamId = stream->streamId; + ckpt_v2.ckpt_rec.write_log.curFileSize = stream->curFileSize; + ckpt_v2.ckpt_rec.write_log.logFileCurrent = + const_cast<char *>(stream->logFileCurrent.c_str()); + ckpt_v2.ckpt_rec.write_log.logRecord = record; + ckpt_v2.ckpt_rec.write_log.c_file_close_time_stamp = + stream->act_last_close_timestamp; + ckpt_ptr = &ckpt_v2; + } else { + memset(&ckpt_v1, 0, sizeof(ckpt_v1)); + ckpt_v1.header.ckpt_rec_type = LGS_CKPT_LOG_WRITE; + ckpt_v1.header.num_ckpt_records = 1; + ckpt_v1.header.data_len = 1; + ckpt_v1.ckpt_rec.write_log.recordId = stream->logRecordId; + ckpt_v1.ckpt_rec.write_log.streamId = stream->streamId; + ckpt_v1.ckpt_rec.write_log.curFileSize = stream->curFileSize; + ckpt_v1.ckpt_rec.write_log.logFileCurrent = + const_cast<char *>(stream->logFileCurrent.c_str()); + ckpt_ptr = &ckpt_v1; + } + + (void)lgs_ckpt_send_async(lgs_cb, ckpt_ptr, NCS_MBCSV_ACT_ADD); + } + + /* Save stb_recordId. Used by standby if configured for split file system. + * It's save here in order to contain a correct value if this node becomes + * standby. + */ + stream->stb_logRecordId = stream->logRecordId; +} diff --git a/src/log/logd/lgs_mbcsv.h b/src/log/logd/lgs_mbcsv.h index 5bbd616bc..998e843e4 100644 --- a/src/log/logd/lgs_mbcsv.h +++ b/src/log/logd/lgs_mbcsv.h @@ -20,6 +20,8 @@ #ifndef LOG_LOGD_LGS_MBCSV_H_ #define LOG_LOGD_LGS_MBCSV_H_ +#include "log/logd/lgs_stream.h" + #include <stdint.h> #include <saAmf.h> @@ -40,9 +42,10 @@ #define LGS_MBCSV_VERSION_5 5 #define LGS_MBCSV_VERSION_6 6 #define LGS_MBCSV_VERSION_7 7 +#define LGS_MBCSV_VERSION_8 8 /* Current version */ -#define LGS_MBCSV_VERSION 7 +#define LGS_MBCSV_VERSION 8 #define LGS_MBCSV_VERSION_MIN 1 /* Checkpoint message types(Used as 'reotype' w.r.t mbcsv) */ @@ -63,6 +66,9 @@ typedef enum { LGS_CKPT_LGS_CFG = 7, LGS_CKPT_LGS_CFG_V3 = 8, LGS_CKPT_LGS_CFG_V5 = 9, + LGS_CKPT_PUSH_ASYNC, + LGS_CKPT_POP_ASYNC, + LGS_CKPT_POP_WRITE_ASYNC, LGS_CKPT_MSG_MAX } lgsv_ckpt_msg_type_t; @@ -114,6 +120,7 @@ bool lgs_is_peer_v6(); // New numeric values added to logStreamTypeT used in the // lgs_ckpt_stream_open_t structure bool lgs_is_peer_v7(); +bool lgs_is_peer_v8(); bool lgs_is_split_file_system(); uint32_t lgs_mbcsv_dispatch(NCS_MBCSV_HDL mbcsv_hdl); @@ -138,4 +145,14 @@ uint32_t edp_ed_open_stream_rec(EDU_HDL *edu_hdl, EDU_TKN *edu_tkn, EDU_BUF_ENV *buf_env, EDP_OP_TYPE op, EDU_ERR *o_err); +uint32_t ckpt_decode_log_struct(lgs_cb_t *cb, NCS_MBCSV_CB_ARG *cbk_arg, + void *ckpt_msg, void *struct_ptr, + EDU_PROG_HANDLER edp_function); +void lgs_ckpt_log_async(log_stream_t* stream, char* record); +uint32_t dec_ckpt_header(NCS_UBAID *uba, lgsv_ckpt_header_t *header); +uint32_t process_ckpt_data(lgs_cb_t *cb, void *data); +uint32_t WriteOnStandby(log_stream_t* stream, uint64_t timestamp, + char* file_current, char* logRecord); + + #endif // LOG_LOGD_LGS_MBCSV_H_ diff --git a/src/log/logd/lgs_mbcsv_cache.cc b/src/log/logd/lgs_mbcsv_cache.cc new file mode 100644 index 000000000..41819163b --- /dev/null +++ b/src/log/logd/lgs_mbcsv_cache.cc @@ -0,0 +1,372 @@ +/* -*- OpenSAF -*- + * + * Copyright Ericsson AB 2019 - All Rights Reserved. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed + * under the GNU Lesser General Public License Version 2.1, February 1999. + * The complete license can be accessed from the following location: + * http://opensource.org/licenses/lgpl-license.php + * See the Copying file included with the OpenSAF distribution for full + * licensing terms. + * + * Author(s): Ericsson AB + * + */ + +#include "log/logd/lgs_mbcsv_cache.h" +#include "log/logd/lgs_cache.h" + +uint32_t EncodeDecodePushAsync(EDU_HDL* edu_hdl, EDU_TKN* edu_tkn, + NCSCONTEXT ptr, uint32_t* ptr_data_len, + EDU_BUF_ENV* buf_env, EDP_OP_TYPE op, + EDU_ERR* o_err) { + TRACE_ENTER(); + CkptPushAsync* ckpt_push_async = NULL; + CkptPushAsync** ckpt_push_async_dec_ptr; + EDU_INST_SET ckpt_push_async_rec_ed_rules[] = { + {EDU_START, EncodeDecodePushAsync, 0, 0, 0, + sizeof(CkptPushAsync), 0, NULL}, + + {EDU_EXEC, ncs_edp_uns64, 0, 0, 0, + (long)&((CkptPushAsync*)0)->invocation, 0, NULL}, + {EDU_EXEC, ncs_edp_uns32, 0, 0, 0, + (long)&((CkptPushAsync*)0)->ack_flags, 0, NULL}, + {EDU_EXEC, ncs_edp_uns32, 0, 0, 0, + (long)&((CkptPushAsync*)0)->client_id, 0, NULL}, + {EDU_EXEC, ncs_edp_uns32, 0, 0, 0, + (long)&((CkptPushAsync*)0)->stream_id, 0, NULL}, + {EDU_EXEC, ncs_edp_string, 0, 0, 0, + (long)&((CkptPushAsync*)0)->svc_name, 0, NULL}, + {EDU_EXEC, ncs_edp_uns64, 0, 0, 0, + (long)&((CkptPushAsync*)0)->log_stamp, 0, NULL}, + {EDU_EXEC, ncs_edp_uns16, 0, 0, 0, + (long)&((CkptPushAsync*)0)->severity, 0, NULL}, + {EDU_EXEC, ncs_edp_uns64, 0, 0, 0, + (long)&((CkptPushAsync*)0)->dest, 0, NULL}, + {EDU_EXEC, ncs_edp_string, 0, 0, 0, + (long)&((CkptPushAsync*)0)->from_node, 0, NULL}, + {EDU_EXEC, ncs_edp_uns64, 0, 0, 0, + (long)&((CkptPushAsync*)0)->queue_at, 0, NULL}, + {EDU_EXEC, ncs_edp_uns64, 0, 0, 0, + (long)&((CkptPushAsync*)0)->seq_id, 0, NULL}, + {EDU_EXEC, ncs_edp_string, 0, 0, 0, + (long)&((CkptPushAsync*)0)->log_record, 0, NULL}, + + {EDU_END, 0, 0, 0, 0, 0, 0, NULL}, + }; + + if (op == EDP_OP_TYPE_ENC) { + ckpt_push_async = static_cast<CkptPushAsync*>(ptr); + } else if (op == EDP_OP_TYPE_DEC) { + ckpt_push_async_dec_ptr = static_cast<CkptPushAsync**>(ptr); + if (*ckpt_push_async_dec_ptr == NULL) { + *o_err = EDU_ERR_MEM_FAIL; + return NCSCC_RC_FAILURE; + } + memset(*ckpt_push_async_dec_ptr, 0, sizeof(CkptPushAsync)); + ckpt_push_async = *ckpt_push_async_dec_ptr; + } else { + ckpt_push_async = static_cast<CkptPushAsync*>(ptr); + } + + uint32_t rc = m_NCS_EDU_RUN_RULES(edu_hdl, edu_tkn, + ckpt_push_async_rec_ed_rules, + ckpt_push_async, ptr_data_len, buf_env, + op, o_err); + return rc; +} + +uint32_t EncodeDecodePopAsync(EDU_HDL* edu_hdl, EDU_TKN* edu_tkn, + NCSCONTEXT ptr, uint32_t* ptr_data_len, + EDU_BUF_ENV* buf_env, EDP_OP_TYPE op, + EDU_ERR* o_err) { + TRACE_ENTER(); + CkptPopAsync* ckpt_pop_async = NULL, **ckpt_pop_async_dec_ptr; + EDU_INST_SET ckpt_pop_data_rec_ed_rules[] = { + {EDU_START, EncodeDecodePopAsync, 0, 0, 0, + sizeof(CkptPopAsync), 0, NULL}, + {EDU_EXEC, ncs_edp_uns64, 0, 0, 0, + (long)&((CkptPopAsync*)0)->seq_id, 0, NULL}, + {EDU_END, 0, 0, 0, 0, 0, 0, NULL}, + }; + + if (op == EDP_OP_TYPE_ENC) { + ckpt_pop_async = static_cast<CkptPopAsync*>(ptr); + } else if (op == EDP_OP_TYPE_DEC) { + ckpt_pop_async_dec_ptr = static_cast<CkptPopAsync**>(ptr); + if (*ckpt_pop_async_dec_ptr == NULL) { + *o_err = EDU_ERR_MEM_FAIL; + return NCSCC_RC_FAILURE; + } + memset(*ckpt_pop_async_dec_ptr, 0, sizeof(CkptPopAsync)); + ckpt_pop_async = *ckpt_pop_async_dec_ptr; + } else { + ckpt_pop_async = static_cast<CkptPopAsync*>(ptr); + } + + return m_NCS_EDU_RUN_RULES(edu_hdl, edu_tkn, ckpt_pop_data_rec_ed_rules, + ckpt_pop_async, ptr_data_len, buf_env, + op, o_err); +} + +uint32_t EncodeDecodePopAndWriteAsync(EDU_HDL* edu_hdl, EDU_TKN* edu_tkn, + NCSCONTEXT ptr, uint32_t* ptr_data_len, + EDU_BUF_ENV* buf_env, EDP_OP_TYPE op, + EDU_ERR* o_err) { + TRACE_ENTER(); + CkptPopAndWriteAsync* ckpt_pop_and_write_async = NULL; + CkptPopAndWriteAsync** ckpt_pop_and_write_async_dec_ptr; + EDU_INST_SET ckpt_pop_and_write_async_rec_ed_rules[] = { + {EDU_START, EncodeDecodePopAndWriteAsync, 0, 0, 0, + sizeof(CkptPopAndWriteAsync), 0, NULL}, + {EDU_EXEC, ncs_edp_uns32, 0, 0, 0, + (long)&((CkptPopAndWriteAsync*)0)->stream_id, 0, NULL}, + {EDU_EXEC, ncs_edp_uns32, 0, 0, 0, + (long)&((CkptPopAndWriteAsync*)0)->record_id, 0, NULL}, + {EDU_EXEC, ncs_edp_uns32, 0, 0, 0, + (long)&((CkptPopAndWriteAsync*)0)->file_size, 0, NULL}, + {EDU_EXEC, ncs_edp_string, 0, 0, 0, + (long)&((CkptPopAndWriteAsync*)0)->log_file, 0, NULL}, + {EDU_EXEC, ncs_edp_string, 0, 0, 0, + (long)&((CkptPopAndWriteAsync*)0)->log_record, 0, NULL}, + {EDU_EXEC, ncs_edp_uns64, 0, 0, 0, + (long)&((CkptPopAndWriteAsync*)0)->timestamp, 0, NULL}, + {EDU_EXEC, ncs_edp_uns64, 0, 0, 0, + (long)&((CkptPopAndWriteAsync*)0)->seq_id, 0, NULL}, + {EDU_END, 0, 0, 0, 0, 0, 0, NULL}, + }; + + if (op == EDP_OP_TYPE_ENC) { + ckpt_pop_and_write_async = static_cast<CkptPopAndWriteAsync*>(ptr); + } else if (op == EDP_OP_TYPE_DEC) { + ckpt_pop_and_write_async_dec_ptr = static_cast<CkptPopAndWriteAsync**>(ptr); + if (*ckpt_pop_and_write_async_dec_ptr == NULL) { + *o_err = EDU_ERR_MEM_FAIL; + return NCSCC_RC_FAILURE; + } + memset(*ckpt_pop_and_write_async_dec_ptr, 0, sizeof(CkptPopAndWriteAsync)); + ckpt_pop_and_write_async = *ckpt_pop_and_write_async_dec_ptr; + } else { + ckpt_pop_and_write_async = static_cast<CkptPopAndWriteAsync*>(ptr); + } + + return m_NCS_EDU_RUN_RULES(edu_hdl, edu_tkn, + ckpt_pop_and_write_async_rec_ed_rules, + ckpt_pop_and_write_async, ptr_data_len, buf_env, + op, o_err); +} + +uint32_t DecodePushAsync(lgs_cb_t* cb, void* ckpt_msg, + NCS_MBCSV_CB_ARG* cbk_arg) { + assert(lgs_is_peer_v8()); + TRACE_ENTER(); + auto ckpt_msg_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg); + auto data = &ckpt_msg_v8->ckpt_rec.push_async; + return ckpt_decode_log_struct(cb, cbk_arg, ckpt_msg, data, + EncodeDecodePushAsync); +} + +uint32_t DecodePopAsync(lgs_cb_t* cb, void* ckpt_msg, + NCS_MBCSV_CB_ARG* cbk_arg) { + assert(lgs_is_peer_v8()); + auto ckpt_msg_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg); + auto data = &ckpt_msg_v8->ckpt_rec.pop_async; + return ckpt_decode_log_struct(cb, cbk_arg, ckpt_msg, data, + EncodeDecodePopAsync); +} + +uint32_t DecodePopAndWriteAsync(lgs_cb_t* cb, void* ckpt_msg, + NCS_MBCSV_CB_ARG* cbk_arg) { + assert(lgs_is_peer_v8()); + auto ckpt_msg_v8 = static_cast<lgsv_ckpt_msg_v8_t *>(ckpt_msg); + auto data = &ckpt_msg_v8->ckpt_rec.pop_and_write_async; + return ckpt_decode_log_struct(cb, cbk_arg, ckpt_msg, data, + EncodeDecodePopAndWriteAsync); +} + +uint32_t ckpt_proc_push_async(lgs_cb_t* cb, void* data) { + TRACE_ENTER(); + assert(lgs_is_peer_v8() && "The peer should run with V8 or beyond!"); + auto data_v8 = static_cast<lgsv_ckpt_msg_v8_t*>(data); + auto param = &data_v8->ckpt_rec.push_async; + //Dump(param); + auto cache = std::make_shared<Cache::Data>(param); + Cache::instance()->Push(cache); + // Remember to free memory for string types that are allocated by + // the underlying edu layer. + lgs_free_edu_mem(param->log_record); + lgs_free_edu_mem(param->from_node); + lgs_free_edu_mem(param->svc_name); + return NCSCC_RC_SUCCESS; +} + +uint32_t ckpt_proc_pop_async(lgs_cb_t* cb, void* data) { + TRACE_ENTER(); + assert(lgs_is_peer_v8() && "The peer should run with V8 or beyond!"); + auto data_v8 = static_cast<lgsv_ckpt_msg_v8_t*>(data); + auto param = &data_v8->ckpt_rec.pop_async; + uint64_t seq_id = param->seq_id; + auto top = Cache::instance()->Front(); + if (top->seq_id_ != seq_id) { + LOG_ER("Out of sync! Expected seq: (%" PRIu64 "), Got: (%" PRIu64 ")", + seq_id, top->seq_id_); + return NCSCC_RC_FAILURE; + } + + TRACE("Pop the element with seq id: (%" PRIu64 ")", seq_id); + Cache::instance()->Pop(); + return NCSCC_RC_SUCCESS; +} + +uint32_t ckpt_proc_pop_write_async(lgs_cb_t* cb, void* data) { + TRACE_ENTER(); + assert(lgs_is_peer_v8() && "The peer should run with V8 or beyond!"); + auto data_v8 = static_cast<lgsv_ckpt_msg_v8_t*>(data); + auto param = &data_v8->ckpt_rec.pop_and_write_async; + uint64_t seq_id = param->seq_id; + auto top = Cache::instance()->Front(); + if (top->seq_id_ != seq_id) { + LOG_ER("Out of sync! Expected seq: (%" PRIu64 "), Got: (%" PRIu64 ")", + seq_id, top->seq_id_); + return NCSCC_RC_FAILURE; + } + + TRACE("Pop the element with seq id: (%" PRIu64 ")", seq_id); + Cache::instance()->Pop(); + + char* log_file = param->log_file; + auto timestamp = param->timestamp; + auto stream = log_stream_get_by_id(param->stream_id); + if (stream == NULL) { + LOG_NO("Not found stream id (%d)", param->stream_id); + lgs_free_edu_mem(param->log_record); + lgs_free_edu_mem(log_file); + return NCSCC_RC_SUCCESS; + } + + stream->logRecordId = param->record_id; + stream->curFileSize = param->file_size; + stream->logFileCurrent = param->log_file; + + return WriteOnStandby(stream, timestamp, log_file, param->log_record); +} + +/**************************************************************************** + * Name : edp_ed_ckpt_msg_v8 + * + * Description : This function is an EDU program for encoding/decoding + * lgsv checkpoint messages. This program runs the + * edp_ed_hdr_rec program first to decide the + * checkpoint message type based on which it will call the + * appropriate EDU programs for the different checkpoint + * messages. + * + * Arguments : EDU_HDL - pointer to edu handle, + * EDU_TKN - internal edu token to help encode/decode, + * POINTER to the structure to encode/decode from/to, + * data length specifying number of structures, + * EDU_BUF_ENV - pointer to buffer for encoding/decoding. + * op - operation type being encode/decode. + * EDU_ERR - out param to indicate errors in processing. + * + * Return Values : NCSCC_RC_SUCCESS/NCSCC_RC_FAILURE + * + * Notes : None. + *****************************************************************************/ + +uint32_t edp_ed_ckpt_msg_v8(EDU_HDL *edu_hdl, EDU_TKN *edu_tkn, NCSCONTEXT ptr, + uint32_t *ptr_data_len, EDU_BUF_ENV *buf_env, + EDP_OP_TYPE op, EDU_ERR *o_err) { + TRACE_ENTER(); + lgsv_ckpt_msg_v8_t *ckpt_msg_ptr = NULL, **ckpt_msg_dec_ptr; + EDU_INST_SET ckpt_msg_ed_rules[] = { + {EDU_START, edp_ed_ckpt_msg_v8, 0, 0, 0, sizeof(lgsv_ckpt_msg_v8_t), 0, + NULL}, + {EDU_EXEC, edp_ed_header_rec, 0, 0, 0, + (long)&((lgsv_ckpt_msg_v8_t *)0)->header, 0, NULL}, + + {EDU_TEST, ncs_edp_uns32, 0, 0, 0, + (long)&((lgsv_ckpt_msg_v8_t *)0)->header, 0, + (EDU_EXEC_RTINE)ckpt_msg_test_type}, + + /* Reg Record */ + {EDU_EXEC, edp_ed_reg_rec_v6, 0, 0, static_cast<int>(EDU_EXIT), + (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.initialize_client, 0, + NULL}, + + /* Finalize record */ + {EDU_EXEC, edp_ed_finalize_rec_v2, 0, 0, static_cast<int>(EDU_EXIT), + (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.finalize_client, 0, NULL}, + + /* write log Record */ + {EDU_EXEC, edp_ed_write_rec_v2, 0, 0, static_cast<int>(EDU_EXIT), + (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.write_log, 0, NULL}, + + /* Open stream */ + {EDU_EXEC, edp_ed_open_stream_rec, 0, 0, static_cast<int>(EDU_EXIT), + (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.stream_open, 0, NULL}, + + /* Close stream */ + {EDU_EXEC, edp_ed_close_stream_rec_v2, 0, 0, static_cast<int>(EDU_EXIT), + (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.stream_close, 0, NULL}, + + /* Agent dest */ + {EDU_EXEC, edp_ed_agent_down_rec_v2, 0, 0, static_cast<int>(EDU_EXIT), + (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.stream_cfg, 0, NULL}, + + /* Cfg stream */ + {EDU_EXEC, edp_ed_cfg_stream_rec_v6, 0, 0, static_cast<int>(EDU_EXIT), + (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.stream_cfg, 0, NULL}, + + /* Lgs cfg */ + {EDU_EXEC, edp_ed_lgs_cfg_rec_v5, 0, 0, static_cast<int>(EDU_EXIT), + (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.lgs_cfg, 0, NULL}, + + /* Push a write async */ + {EDU_EXEC, EncodeDecodePushAsync, 0, 0, static_cast<int>(EDU_EXIT), + (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.push_async, 0, NULL}, + + /* Pop a write async */ + {EDU_EXEC, EncodeDecodePopAsync, 0, 0, static_cast<int>(EDU_EXIT), + (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.pop_async, 0, NULL}, + + /* Pop a write a sync and after done processing the write request */ + {EDU_EXEC, EncodeDecodePopAndWriteAsync, 0, 0, static_cast<int>(EDU_EXIT), + (long)&((lgsv_ckpt_msg_v8_t *)0)->ckpt_rec.pop_and_write_async, 0, + NULL}, + + {EDU_END, 0, 0, 0, 0, 0, 0, NULL}, + }; + + if (op == EDP_OP_TYPE_ENC) { + ckpt_msg_ptr = static_cast<lgsv_ckpt_msg_v8_t *>(ptr); + } else if (op == EDP_OP_TYPE_DEC) { + ckpt_msg_dec_ptr = static_cast<lgsv_ckpt_msg_v8_t **>(ptr); + if (*ckpt_msg_dec_ptr == NULL) { + *o_err = EDU_ERR_MEM_FAIL; + return NCSCC_RC_FAILURE; + } + memset(*ckpt_msg_dec_ptr, '\0', sizeof(lgsv_ckpt_msg_v8_t)); + ckpt_msg_ptr = *ckpt_msg_dec_ptr; + } else { + ckpt_msg_ptr = static_cast<lgsv_ckpt_msg_v8_t *>(ptr); + } + + return m_NCS_EDU_RUN_RULES(edu_hdl, edu_tkn, ckpt_msg_ed_rules, ckpt_msg_ptr, + ptr_data_len, buf_env, op, o_err); +} + +void Dump(const CkptPushAsync* data) { + LOG_NO("- CkptPushAsync info - "); + LOG_NO("invocation: %llu", data->invocation); + LOG_NO("client_id: %u", data->client_id); + LOG_NO("stream_id: %u", data->stream_id); + LOG_NO("svc_name: %s", data->svc_name == nullptr ? "(null)" : data->svc_name); + LOG_NO("from_node: %s", data->from_node == nullptr ? "(null)" : + data->from_node); + LOG_NO("log_record: %s", data->log_record); + LOG_NO("seq_id_: %" PRIu64, data->seq_id); + LOG_NO("Queue at: %" PRIu64, data->queue_at); +} diff --git a/src/log/logd/lgs_mbcsv_cache.h b/src/log/logd/lgs_mbcsv_cache.h new file mode 100644 index 000000000..a6f5f440b --- /dev/null +++ b/src/log/logd/lgs_mbcsv_cache.h @@ -0,0 +1,110 @@ +/* -*- OpenSAF -*- + * + * Copyright Ericsson AB 2019 - All Rights Reserved. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed + * under the GNU Lesser General Public License Version 2.1, February 1999. + * The complete license can be accessed from the following location: + * http://opensource.org/licenses/lgpl-license.php + * See the Copying file included with the OpenSAF distribution for full + * licensing terms. + * + * Author(s): Ericsson AB + * + */ + +#ifndef LOG_LOGD_LGS_MBCSV_CACHE_H_ +#define LOG_LOGD_LGS_MBCSV_CACHE_H_ + +#include "log/logd/lgs_mbcsv_v2.h" +#include "log/logd/lgs_mbcsv_v3.h" +#include "log/logd/lgs_mbcsv_v5.h" +#include "log/logd/lgs_mbcsv_v6.h" + +#include "base/ncs_edu_pub.h" +#include "base/ncsencdec_pub.h" + +struct CkptPushAsync { + SaInvocationT invocation; + uint32_t ack_flags; + uint32_t client_id; + uint32_t stream_id; + char* svc_name; + SaTimeT log_stamp; + SaLogSeverityT severity; + MDS_DEST dest; + char* from_node; + + uint64_t queue_at; + uint64_t seq_id; + char* log_record; +}; + +struct CkptPopAsync { + uint64_t seq_id; +}; + +struct CkptPopAndWriteAsync { + uint32_t stream_id; + uint32_t record_id; + uint32_t file_size; + char* log_file; + char* log_record; + uint64_t timestamp; + uint64_t seq_id; +}; + +struct lgsv_ckpt_msg_v8_t { + lgsv_ckpt_header_t header; + union { + lgs_ckpt_initialize_msg_v6_t initialize_client; + lgs_ckpt_finalize_msg_v2_t finalize_client; + lgs_ckpt_write_log_v2_t write_log; + lgs_ckpt_agent_down_v2_t agent_down; + lgs_ckpt_stream_open_t stream_open; + lgs_ckpt_stream_close_v2_t stream_close; + lgs_ckpt_stream_cfg_v3_t stream_cfg; + lgs_ckpt_lgs_cfg_v5_t lgs_cfg; + CkptPushAsync push_async; + CkptPopAsync pop_async; + CkptPopAndWriteAsync pop_and_write_async; + } ckpt_rec; +}; + +uint32_t edp_ed_ckpt_msg_v8(EDU_HDL *edu_hdl, EDU_TKN *edu_tkn, NCSCONTEXT ptr, + uint32_t *ptr_data_len, EDU_BUF_ENV *buf_env, + EDP_OP_TYPE op, EDU_ERR *o_err); + +uint32_t EncodeDecodePushAsync(EDU_HDL* edu_hdl, EDU_TKN* edu_tkn, + NCSCONTEXT ptr, uint32_t* ptr_data_len, + EDU_BUF_ENV* buf_env, EDP_OP_TYPE op, + EDU_ERR* o_err); +uint32_t EncodeDecodePopAsync(EDU_HDL* edu_hdl, EDU_TKN* edu_tkn, + NCSCONTEXT ptr, uint32_t* ptr_data_len, + EDU_BUF_ENV* buf_env, EDP_OP_TYPE op, + EDU_ERR* o_err); +uint32_t EncodeDecodePopAndWriteAsync(EDU_HDL* edu_hdl, EDU_TKN* edu_tkn, + NCSCONTEXT ptr, uint32_t* ptr_data_len, + EDU_BUF_ENV* buf_env, EDP_OP_TYPE op, + EDU_ERR* o_err); +uint32_t DecodePushAsync(lgs_cb_t* cb, void* ckpt_msg, + NCS_MBCSV_CB_ARG* cbk_arg); +uint32_t DecodePopAsync(lgs_cb_t* cb, void* ckpt_msg, + NCS_MBCSV_CB_ARG* cbk_arg); +uint32_t DecodePopAndWriteAsync(lgs_cb_t* cb, void* ckpt_msg, + NCS_MBCSV_CB_ARG* cbk_arg); + +uint32_t ckpt_proc_push_async(lgs_cb_t* cb, void* data); +uint32_t ckpt_proc_pop_async(lgs_cb_t* cb, void* data); +uint32_t ckpt_proc_pop_write_async(lgs_cb_t* cb, void* data); + +void Dump(const CkptPushAsync* data); + + +#ifdef SIMULATE_NFS_UNRESPONSE +extern uint32_t test_counter; +#endif + +#endif // LOG_LOGD_LGS_MBCSV_CACHE_H_ diff --git a/src/log/logd/lgs_mbcsv_v1.cc b/src/log/logd/lgs_mbcsv_v1.cc index 8fb059ad3..32e877031 100644 --- a/src/log/logd/lgs_mbcsv_v1.cc +++ b/src/log/logd/lgs_mbcsv_v1.cc @@ -45,6 +45,7 @@ uint32_t edp_ed_write_rec_v1(EDU_HDL *edu_hdl, EDU_TKN *edu_tkn, NCSCONTEXT ptr, uint32_t *ptr_data_len, EDU_BUF_ENV *buf_env, EDP_OP_TYPE op, EDU_ERR *o_err) { + TRACE_ENTER(); uint32_t rc = NCSCC_RC_SUCCESS; lgs_ckpt_write_log_v1_t *ckpt_write_msg_ptr = NULL, **ckpt_write_msg_dec_ptr; diff --git a/src/log/logd/lgs_mbcsv_v2.cc b/src/log/logd/lgs_mbcsv_v2.cc index 63807e1b7..e543ad7e7 100644 --- a/src/log/logd/lgs_mbcsv_v2.cc +++ b/src/log/logd/lgs_mbcsv_v2.cc @@ -100,6 +100,7 @@ uint32_t ckpt_proc_lgs_cfg_v2(lgs_cb_t *cb, void *data) { uint32_t edp_ed_write_rec_v2(EDU_HDL *edu_hdl, EDU_TKN *edu_tkn, NCSCONTEXT ptr, uint32_t *ptr_data_len, EDU_BUF_ENV *buf_env, EDP_OP_TYPE op, EDU_ERR *o_err) { + TRACE_ENTER(); uint32_t rc = NCSCC_RC_SUCCESS; lgs_ckpt_write_log_v2_t *ckpt_write_msg_ptr = NULL, **ckpt_write_msg_dec_ptr; @@ -486,6 +487,7 @@ uint32_t edp_ed_agent_down_rec_v2(EDU_HDL *edu_hdl, EDU_TKN *edu_tkn, uint32_t edp_ed_ckpt_msg_v2(EDU_HDL *edu_hdl, EDU_TKN *edu_tkn, NCSCONTEXT ptr, uint32_t *ptr_data_len, EDU_BUF_ENV *buf_env, EDP_OP_TYPE op, EDU_ERR *o_err) { + TRACE_ENTER(); uint32_t rc = NCSCC_RC_SUCCESS; lgsv_ckpt_msg_v2_t *ckpt_msg_ptr = NULL, **ckpt_msg_dec_ptr; -- 2.17.1 _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel