+ if (mds_tipc_fctrl_trysend(buffer, buff_len, id) ==
NCSCC_RC_SUCCESS) {
+ send_len = sendto(tipc_cb.BSRsock, buffer, buff_len, 0,
+ (struct sockaddr *)&server_addr,
sizeof(server_addr));
+ if (send_len == buff_len) {
+ m_MDS_LOG_INFO("MDTM: Successfully sent message");
+ return NCSCC_RC_SUCCESS;
+ } else if (send_len == -1) {
+ m_MDS_LOG_ERR("MDTM: Failed to send message err :%s",
strerror(errno));
+ // todo: it's failed to send msg, if flow control is
enabled
+ // the msg needs to mark unsent
+ return NCSCC_RC_FAILURE;
+ } else {
+ m_MDS_LOG_ERR("MDTM: Failed to send message send_len
:%zd", send_len);
+ // todo: it's failed to send msg, if flow control is
enabled
+ // the msg needs to mark unsent
+ return NCSCC_RC_FAILURE;
+ }
}
+ return NCSCC_RC_SUCCESS;
}
/*********************************************************
@@ -3103,12 +3171,12 @@ static uint32_t mdtm_mcast_sendto(void
*buffer, size_t size,
server_addr.addr.nameseq.lower = HTONL(MDS_MDTM_LOWER_INSTANCE);
/*This can be scope-down to dest_svc_id server_inst TBD*/
server_addr.addr.nameseq.upper = HTONL(MDS_MDTM_UPPER_INSTANCE);
-
int send_len =
sendto(tipc_cb.BSRsock, buffer, size, 0,
(struct sockaddr *)&server_addr, sizeof(server_addr));
+
if (send_len == size) {
- m_MDS_LOG_INFO("MDTM: Successfully sent message");
+ m_MDS_LOG_INFO("MDTM: Successfully sent mcast message");
return NCSCC_RC_SUCCESS;
} else {
m_MDS_LOG_ERR("MDTM: Failed to send Multicast message err
:%s",
@@ -3151,7 +3219,7 @@ static uint32_t mdtm_add_mds_hdr(uint8_t
*buffer, MDTM_SEND_REQ *req)
uint8_t *ptr;
ptr = buffer;
- prot_ver |= MDS_PROT | MDS_VERSION | ((uint8_t)(req->pri - 1));
+ prot_ver |= gl_mds_pro_ver | ((uint8_t)(req->pri - 1));
enc_snd_type = (req->msg.encoding << 6);
enc_snd_type |= (uint8_t)req->snd_type;
diff --git a/src/mds/mds_tipc_fctrl_intf.cc
b/src/mds/mds_tipc_fctrl_intf.cc
new file mode 100644
index 0000000..91b9107
--- /dev/null
+++ b/src/mds/mds_tipc_fctrl_intf.cc
@@ -0,0 +1,376 @@
+/* -*- OpenSAF -*-
+ *
+ * (C) Copyright 2019 The OpenSAF Foundation
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are
licensed
+ * under the GNU Lesser General Public License Version 2.1,
February 1999.
+ * The complete license can be accessed from the following location:
+ *
https://protect2.fireeye.com/url?k=a4ac81b5-f8252ede-a4acc12e-0cc47ad93ea2-93ec22754767d852&q=1&u=http%3A%2F%2Fopensource.org%2Flicenses%2Flgpl-license.php
+ * See the Copying file included with the OpenSAF distribution for
full
+ * licensing terms.
+ *
+ * Author(s): Ericsson AB
+ *
+ */
+#include "mds/mds_tipc_fctrl_intf.h"
+
+#include <poll.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <sys/poll.h>
+#include <unistd.h>
+
+#include <map>
+#include <mutex>
+
+#include "base/ncssysf_def.h"
+#include "base/ncssysf_tsk.h"
+
+#include "mds/mds_log.h"
+#include "mds/mds_tipc_fctrl_portid.h"
+#include "mds/mds_tipc_fctrl_msg.h"
+
+using mds::Event;
+using mds::TipcPortId;
+using mds::DataMessage;
+using mds::ChunkAck;
+using mds::HeaderMessage;
+
+namespace {
+// multicast/broadcast enabled
+// todo: to be removed if flow control support it
+bool is_mcast_enabled = true;
+
+// mailbox handles for events
+SYSF_MBX mbx_events;
+
+// ncs task handle
+NCSCONTEXT p_task_hdl = nullptr;
+
+// data socket associated with port id
+int data_sock_fd = 0;
+struct tipc_portid snd_rcv_portid;
+
+// socket receiver's buffer size
+// todo: This buffer size is read from the local node as an estimated
+// buffer size of the receiver sides, in facts that could be different
+// at the receiver's sides (in the other nodes). At this moment, we
+// assume all nodes in cluster have set the same tipc buffer size
+uint64_t sock_buf_size = 0;
+
+// map of key:unique id(adest), value: TipcPortId instance
+// unique id is derived from struct tipc_portid
+std::map<uint64_t, TipcPortId*> portid_map;
+std::mutex portid_map_mutex;
+
+// chunk ack parameters
+// todo: The chunk ack size should be configurable
+uint16_t kChunkAckSize = 3;
+
+TipcPortId* portid_lookup(struct tipc_portid id) {
+ uint64_t uid = TipcPortId::GetUniqueId(id);
+ if (portid_map.find(uid) == portid_map.end()) return nullptr;
+ return portid_map[uid];
+}
+
+uint32_t process_flow_event(const Event evt) {
+ uint32_t rc = NCSCC_RC_SUCCESS;
+ TipcPortId *portid = portid_lookup(evt.id_);
+ if (portid == nullptr) {
+ if (evt.type_ == Event::Type::kEvtRcvData) {
+ portid = new TipcPortId(evt.id_, data_sock_fd, kChunkAckSize,
+ sock_buf_size);
+ portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid;
+ if (evt.type_ == Event::Type::kEvtRcvData) {
+ rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
+ evt.fseq_, evt.svc_id_);
+ }
+ }
+ } else {
+ if (evt.type_ == Event::Type::kEvtRcvData) {
+ rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
+ evt.fseq_, evt.svc_id_);
+ }
+ if (evt.type_ == Event::Type::kEvtRcvChunkAck) {
+ portid->ReceiveChunkAck(evt.fseq_, evt.chunk_size_);
+ }
+ if (evt.type_ == Event::Type::kEvtSendChunkAck) {
+ portid->SendChunkAck(evt.fseq_, evt.svc_id_, evt.chunk_size_);
+ }
+ if (evt.type_ == Event::Type::kEvtDropData) {
+ portid->ReceiveNack(evt.mseq_, evt.mfrag_,
+ evt.fseq_);
+ }
+ }
+ return rc;
+}
+
+uint32_t process_all_events(void) {
+ enum { FD_FCTRL = 0, NUM_FDS };
+
+ int poll_tmo = MDTM_TIPC_POLL_TIMEOUT;
+ while (true) {
+ int pollres;
+ struct pollfd pfd[NUM_FDS] = {{0}};
+
+ pfd[FD_FCTRL].fd =
+ ncs_ipc_get_sel_obj(&mbx_events).rmv_obj;
+ pfd[FD_FCTRL].events = POLLIN;
+
+ pollres = poll(pfd, NUM_FDS, poll_tmo);
+
+ if (pollres == -1) {
+ if (errno == EINTR) continue;
+ m_MDS_LOG_ERR("FCTRL: poll() failed:%s", strerror(errno));
+ break;
+ }
+
+ if (pollres > 0) {
+ if (pfd[FD_FCTRL].revents == POLLIN) {
+ Event *evt = reinterpret_cast<Event*>(ncs_ipc_non_blk_recv(
+ &mbx_events));
+
+ if (evt == nullptr) continue;
+
+ portid_map_mutex.lock();
+ process_flow_event(*evt);
+ delete evt;
+ portid_map_mutex.unlock();
+ }
+ }
+ } /* while */
+ return NCSCC_RC_SUCCESS;
+}
+
+uint32_t create_ncs_task(void *task_hdl) {
+ int policy = SCHED_OTHER; /*root defaults */
+ int max_prio = sched_get_priority_max(policy);
+ int min_prio = sched_get_priority_min(policy);
+ int prio_val = ((max_prio - min_prio) * 0.87);
+
+ if (m_NCS_IPC_CREATE(&mbx_events) != NCSCC_RC_SUCCESS) {
+ m_MDS_LOG_ERR("m_NCS_IPC_CREATE failed");
+ return NCSCC_RC_FAILURE;
+ }
+ if (m_NCS_IPC_ATTACH(&mbx_events) != NCSCC_RC_SUCCESS) {
+ m_MDS_LOG_ERR("m_NCS_IPC_ATTACH failed");
+ return NCSCC_RC_FAILURE;
+ }
+ if (ncs_task_create((NCS_OS_CB)process_all_events, 0,
+ "OSAF_MDS", prio_val, policy, NCS_MDTM_STACKSIZE,
+ &task_hdl) != NCSCC_RC_SUCCESS) {
+ m_MDS_LOG_ERR("FCTRL: Task Creation-failed:\n");
+ return NCSCC_RC_FAILURE;
+ }
+
+ m_MDS_LOG_NOTIFY("FCTRL: Start process_all_events");
+ return NCSCC_RC_SUCCESS;
+}
+
+} // end local namespace
+
+uint32_t mds_tipc_fctrl_initialize(int dgramsock, struct
tipc_portid id,
+ uint64_t rcv_buf_size, bool mcast_enabled) {
+ if (create_ncs_task(&p_task_hdl) !=
+ NCSCC_RC_SUCCESS) {
+ m_MDS_LOG_ERR("FCTRL: Start of the Created Task-failed:\n");
+ return NCSCC_RC_FAILURE;
+ }
+ data_sock_fd = dgramsock;
+ snd_rcv_portid = id;
+ sock_buf_size = rcv_buf_size;
+ is_mcast_enabled = mcast_enabled;
+
+ m_MDS_LOG_NOTIFY("FCTRL: Initialize [node:%x, ref:%u]",
+ id.node, id.ref);
+
+ return NCSCC_RC_SUCCESS;
+}
+
+uint32_t mds_tipc_fctrl_shutdown(void) {
+ if (ncs_task_release(p_task_hdl) != NCSCC_RC_SUCCESS) {
+ m_MDS_LOG_ERR("FCTRL: Stop of the Created Task-failed:\n");
+ }
+ return NCSCC_RC_SUCCESS;
+}
+
+uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id,
uint16_t len,
+ uint16_t* next_seq) {
+ uint32_t rc = NCSCC_RC_SUCCESS;
+
+ portid_map_mutex.lock();
+
+ TipcPortId *portid = portid_lookup(id);
+ if (portid == nullptr) {
+ m_MDS_LOG_ERR("FCTRL: PortId not found [node:%x, ref:%u] line:%u",
+ id.node, id.ref, __LINE__);
+ rc = NCSCC_RC_FAILURE;
+ } else {
+ // assign the sequence number of the outgoing message
+ *next_seq = portid->GetCurrentSeq();
+ }
+
+ portid_map_mutex.unlock();
+
+ return rc;
+}
+
+uint32_t mds_tipc_fctrl_trysend(const uint8_t *buffer, uint16_t len,
+ struct tipc_portid id) {
+ uint32_t rc = NCSCC_RC_SUCCESS;
+
+ portid_map_mutex.lock();
+
+ TipcPortId *portid = portid_lookup(id);
+ if (portid == nullptr) {
+ m_MDS_LOG_ERR("FCTRL: PortId not found [node:%x, ref:%u] line:%u",
+ id.node, id.ref, __LINE__);
+ rc = NCSCC_RC_FAILURE;
+ } else {
+ portid->Queue(buffer, len);
+ }
+
+ portid_map_mutex.unlock();
+
+ return rc;
+}
+
+uint32_t mds_tipc_fctrl_portid_up(struct tipc_portid id, uint32_t
type) {
+ MDS_SVC_ID svc_id = (uint16_t)(type & MDS_EVENT_MASK_FOR_SVCID);
+
+ portid_map_mutex.lock();
+
+ // Add this new tipc portid to the map
+ TipcPortId *portid = portid_lookup(id);
+ uint64_t uid = TipcPortId::GetUniqueId(id);
+ if (portid == nullptr) {
+ portid_map[uid] = portid = new TipcPortId(id, data_sock_fd,
+ kChunkAckSize, sock_buf_size);
+ m_MDS_LOG_NOTIFY("FCTRL: Add portid[node:%x, ref:%u svc_id:%u],
svc_cnt:%u",
+ id.node, id.ref, svc_id, portid->svc_cnt_);
+ } else {
+ portid->svc_cnt_++;
+ m_MDS_LOG_NOTIFY("FCTRL: Add svc[node:%x, ref:%u svc_id:%u],
svc_cnt:%u",
+ id.node, id.ref, svc_id, portid->svc_cnt_);
+ }
+
+ portid_map_mutex.unlock();
+
+ return NCSCC_RC_SUCCESS;
+}
+
+uint32_t mds_tipc_fctrl_portid_down(struct tipc_portid id, uint32_t
type) {
+ MDS_SVC_ID svc_id = (uint16_t)(type & MDS_EVENT_MASK_FOR_SVCID);
+
+ portid_map_mutex.lock();
+
+ // Delete this tipc portid out of the map
+ TipcPortId *portid = portid_lookup(id);
+ if (portid != nullptr) {
+ portid->svc_cnt_--;
+ m_MDS_LOG_NOTIFY("FCTRL: Remove svc[node:%x, ref:%u svc_id:%u],
svc_cnt:%u",
+ id.node, id.ref, svc_id, portid->svc_cnt_);
+ }
+ portid_map_mutex.unlock();
+
+ return NCSCC_RC_SUCCESS;
+}
+
+uint32_t mds_tipc_fctrl_portid_terminate(struct tipc_portid id) {
+ portid_map_mutex.lock();
+
+ // Delete this tipc portid out of the map
+ TipcPortId *portid = portid_lookup(id);
+ if (portid != nullptr) {
+ delete portid;
+ portid_map.erase(TipcPortId::GetUniqueId(id));
+ m_MDS_LOG_NOTIFY("FCTRL: Remove portid[node:%x, ref:%u]",
id.node, id.ref);
+ }
+
+ portid_map_mutex.unlock();
+
+ return NCSCC_RC_SUCCESS;
+}
+
+uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer, uint16_t len,
+ struct tipc_portid id) {
+ HeaderMessage header;
+ header.Decode(buffer);
+ // if mds support flow control
+ if ((header.pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) {
+ if (header.pro_id_ == MDS_PROT_FCTRL_ID) {
+ if (header.msg_type_ == ChunkAck::kChunkAckMsgType) {
+ // receive single ack message
+ ChunkAck ack;
+ ack.Decode(buffer);
+ // send to the event thread
+ if (m_NCS_IPC_SEND(&mbx_events,
+ new Event(Event::Type::kEvtSendChunkAck, id, ack.svc_id_,
+ header.mseq_, header.mfrag_, ack.acked_fseq_,
ack.chunk_size_),
+ NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
+ m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n");
+ return NCSCC_RC_FAILURE;
+ }
+ return NCSCC_RC_SUCCESS;
+ }
+ } else {
+ // receive data message
+ DataMessage data;
+ data.Decode(buffer);
+ // send to the event thread
+ if (m_NCS_IPC_SEND(&mbx_events,
+ new Event(Event::Type::kEvtDropData, id, data.svc_id_,
+ header.mseq_, header.mfrag_, header.fseq_),
+ NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
+ m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n");
+ return NCSCC_RC_FAILURE;
+ }
+ return NCSCC_RC_SUCCESS;
+ }
+ }
+
+ return NCSCC_RC_SUCCESS;
+}
+
+uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len,
+ struct tipc_portid id) {
+ HeaderMessage header;
+ header.Decode(buffer);
+ // if mds support flow control
+ if ((header.pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) {
+ if (header.pro_id_ == MDS_PROT_FCTRL_ID) {
+ if (header.msg_type_ == ChunkAck::kChunkAckMsgType) {
+ // receive single ack message
+ ChunkAck ack;
+ ack.Decode(buffer);
+ // send to the event thread
+ if (m_NCS_IPC_SEND(&mbx_events,
+ new Event(Event::Type::kEvtRcvChunkAck, id, ack.svc_id_,
+ header.mseq_, header.mfrag_, ack.acked_fseq_,
ack.chunk_size_),
+ NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
+ m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n");
+ }
+ // return NCSCC_RC_FAILURE, so the tipc receiving thread
(legacy) will
+ // skip this data msg
+ return NCSCC_RC_FAILURE;
+ }
+ } else {
+ // receive data message
+ DataMessage data;
+ data.Decode(buffer);
+ // todo: skip mcast/bcast, revisit
+ if ((data.snd_type_ == MDS_SENDTYPE_BCAST ||
+ data.snd_type_ == MDS_SENDTYPE_RBCAST) &&
is_mcast_enabled) {
+ return NCSCC_RC_SUCCESS;
+ }
+ portid_map_mutex.lock();
+ uint32_t rc = process_flow_event(Event(Event::Type::kEvtRcvData,
+ id, data.svc_id_, header.mseq_, header.mfrag_,
header.fseq_));
+ portid_map_mutex.unlock();
+ return rc;
+ }
+ }
+ return NCSCC_RC_SUCCESS;
+}
diff --git a/src/mds/mds_tipc_fctrl_intf.h
b/src/mds/mds_tipc_fctrl_intf.h
new file mode 100644
index 0000000..85a058f
--- /dev/null
+++ b/src/mds/mds_tipc_fctrl_intf.h
@@ -0,0 +1,47 @@
+/* -*- OpenSAF -*-
+ *
+ * (C) Copyright 2019 The OpenSAF Foundation
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are
licensed
+ * under the GNU Lesser General Public License Version 2.1,
February 1999.
+ * The complete license can be accessed from the following location:
+ *
https://protect2.fireeye.com/url?k=8c56609d-d0dfcff6-8c562006-0cc47ad93ea2-edf93c78f64bf311&q=1&u=http%3A%2F%2Fopensource.org%2Flicenses%2Flgpl-license.php
+ * See the Copying file included with the OpenSAF distribution for
full
+ * licensing terms.
+ *
+ * Author(s): Ericsson AB
+ *
+ */
+
+#ifndef MDS_MDS_TIPC_FCTRL_INTF_H_
+#define MDS_MDS_TIPC_FCTRL_INTF_H_
+
+#include <linux/tipc.h>
+#include <stdbool.h>
+#include <stdint.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+uint32_t mds_tipc_fctrl_initialize(int dgramsock, struct
tipc_portid id,
+ uint64_t rcv_buf_size, bool mbrcast_enabled);
+uint32_t mds_tipc_fctrl_shutdown(void);
+uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len,
+ struct tipc_portid id);
+uint32_t mds_tipc_fctrl_portid_up(struct tipc_portid id, uint32_t
type);
+uint32_t mds_tipc_fctrl_portid_down(struct tipc_portid id, uint32_t
type);
+uint32_t mds_tipc_fctrl_portid_terminate(struct tipc_portid id);
+uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer, uint16_t len,
+ struct tipc_portid id);
+uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id,
uint16_t len,
+ uint16_t* next_seq);
+uint32_t mds_tipc_fctrl_trysend(const uint8_t *buffer, uint16_t len,
+ struct tipc_portid id);
+#ifdef __cplusplus
+}
+#endif
+
+#endif // MDS_MDS_TIPC_FCTRL_INTF_H_
diff --git a/src/mds/mds_tipc_fctrl_msg.cc
b/src/mds/mds_tipc_fctrl_msg.cc
new file mode 100644
index 0000000..abd38d3
--- /dev/null
+++ b/src/mds/mds_tipc_fctrl_msg.cc
@@ -0,0 +1,142 @@
+/* -*- OpenSAF -*-
+ *
+ * (C) Copyright 2019 The OpenSAF Foundation
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are
licensed
+ * under the GNU Lesser General Public License Version 2.1,
February 1999.
+ * The complete license can be accessed from the following location:
+ *
https://protect2.fireeye.com/url?k=6b1e74f3-3797db98-6b1e3468-0cc47ad93ea2-3e7c86e3dfcd12a1&q=1&u=http%3A%2F%2Fopensource.org%2Flicenses%2Flgpl-license.php
+ * See the Copying file included with the OpenSAF distribution for
full
+ * licensing terms.
+ *
+ * Author(s): Ericsson AB
+ *
+ */
+
+#include "mds/mds_tipc_fctrl_msg.h"
+#include "base/ncssysf_def.h"
+
+namespace mds {
+HeaderMessage::HeaderMessage(uint16_t msg_len, uint32_t mseq,
+ uint16_t mfrag, uint16_t fseq): msg_len_(msg_len),
+ mseq_(mseq), mfrag_(mfrag), fseq_(fseq) {
+ pro_ver_ = MDS_PROT_FCTRL;
+ pro_id_ = 0;
+ msg_type_ = 0;
+}
+
+void HeaderMessage::Encode(uint8_t *msg) {
+ uint8_t *ptr;
+
+ // encode message length
+ ptr = &msg[0];
+ ncs_encode_16bit(&ptr, msg_len_);
+ // encode sequence number
+ ptr = &msg[2];
+ ncs_encode_32bit(&ptr, mseq_);
+ // encode sequence number
+ ptr = &msg[6];
+ ncs_encode_16bit(&ptr, mfrag_);
+ // skip length_check: oct8&9
+ // encode protocol version
+ ptr = &msg[10];
+ ncs_encode_8bit(&ptr, MDS_PROT_FCTRL);
+}
+
+void HeaderMessage::Decode(uint8_t *msg) {
+ uint8_t *ptr;
+
+ // decode message length
+ ptr = &msg[0];
+ msg_len_ = ncs_decode_16bit(&ptr);
+ // decode sequence number
+ ptr = &msg[2];
+ mseq_ = ncs_decode_32bit(&ptr);
+ // decode fragment number
+ ptr = &msg[6];
+ mfrag_ = ncs_decode_16bit(&ptr);
+ // decode protocol version
+ ptr = &msg[10];
+ pro_ver_ = ncs_decode_8bit(&ptr);
+ if ((pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) {
+ // decode flow control sequence number
+ ptr = &msg[8];
+ fseq_ = ncs_decode_16bit(&ptr);
+ // decode protocol identifier
+ ptr = &msg[11];
+ pro_id_ = ncs_decode_32bit(&ptr);
+ if (pro_id_ == MDS_PROT_FCTRL_ID) {
+ // decode message type
+ ptr = &msg[15];
+ msg_type_ = ncs_decode_8bit(&ptr);
+ }
+ } else {
+ if (mfrag_ != 0) {
+ ptr = &msg[8];
+ fseq_ = ncs_decode_16bit(&ptr);
+ if (fseq_ != 0) pro_ver_ = MDS_PROT_FCTRL;
+ }
+ }
+}
+
+void DataMessage::Decode(uint8_t *msg) {
+ uint8_t *ptr;
+
+ // decode service id
+ ptr = &msg[MDTM_PKT_TYPE_OFFSET +
+ MDTM_FRAG_HDR_LEN +
+ MDS_HEADER_RCVR_SVC_ID_POSITION];
+ svc_id_ = ncs_decode_16bit(&ptr);
+ // decode snd_type
+ ptr = &msg[17];
+ snd_type_ = (ncs_decode_8bit(&ptr)) & 0x3f;
+}
+
+DataMessage::~DataMessage() {
+ if (msg_data_ != nullptr) {
+ delete msg_data_;
+ msg_data_ = nullptr;
+ }
+}
+
+ChunkAck::ChunkAck(uint16_t svc_id, uint16_t fseq, uint16_t
chunk_size):
+ svc_id_(svc_id), acked_fseq_(fseq), chunk_size_(chunk_size) {
+ msg_type_ = kChunkAckMsgType;
+}
+
+void ChunkAck::Encode(uint8_t *msg) {
+ uint8_t *ptr;
+ // encode protocol identifier
+ ptr = &msg[11];
+ ncs_encode_32bit(&ptr, MDS_PROT_FCTRL_ID);
+ // encode message type
+ ptr = &msg[15];
+ ncs_encode_8bit(&ptr, kChunkAckMsgType);
+ // encode service id
+ ptr = &msg[16];
+ ncs_encode_16bit(&ptr, svc_id_);
+ // encode flow control sequence number
+ ptr = &msg[18];
+ ncs_encode_16bit(&ptr, acked_fseq_);
+ // encode chunk size
+ ptr = &msg[20];
+ ncs_encode_16bit(&ptr, chunk_size_);
+}
+
+void ChunkAck::Decode(uint8_t *msg) {
+ uint8_t *ptr;
+
+ // decode service id
+ ptr = &msg[16];
+ svc_id_ = ncs_decode_16bit(&ptr);
+ // decode flow control sequence number
+ ptr = &msg[18];
+ acked_fseq_ = ncs_decode_16bit(&ptr);
+ // decode chunk size
+ ptr = &msg[20];
+ chunk_size_ = ncs_decode_16bit(&ptr);
+}
+
+} // end namespace mds
diff --git a/src/mds/mds_tipc_fctrl_msg.h
b/src/mds/mds_tipc_fctrl_msg.h
new file mode 100644
index 0000000..677f256
--- /dev/null
+++ b/src/mds/mds_tipc_fctrl_msg.h
@@ -0,0 +1,129 @@
+/* -*- OpenSAF -*-
+ *
+ * (C) Copyright 2019 The OpenSAF Foundation
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are
licensed
+ * under the GNU Lesser General Public License Version 2.1,
February 1999.
+ * The complete license can be accessed from the following location:
+ *
https://protect2.fireeye.com/url?k=4fa312f3-132abd98-4fa35268-0cc47ad93ea2-ca1523d965a4960f&q=1&u=http%3A%2F%2Fopensource.org%2Flicenses%2Flgpl-license.php
+ * See the Copying file included with the OpenSAF distribution for
full
+ * licensing terms.
+ *
+ * Author(s): Ericsson AB
+ *
+ */
+
+#ifndef MDS_MDS_TIPC_FCTRL_MSG_H_
+#define MDS_MDS_TIPC_FCTRL_MSG_H_
+
+#include <linux/tipc.h>
+#include "base/ncssysf_ipc.h"
+#include "base/ncssysf_tmr.h"
+#include "mds/mds_dt.h"
+
+namespace mds {
+
+class Event {
+ public:
+ enum class Type {
+ kEvtPortIdAll = 0,
+ kEvtPortIdUp, // event of new portid published (not used)
+ kEvtPortIdDown, // event of portid widthdrawn (not used)
+
+ kEvtDataFlowAll,
+ kEvtRcvData, // event that received data msg from peer
+ kEvtSendChunkAck, // event to send the ack for a chunk of N
accumulative
+ // data msgs (N>=1)
+ kEvtRcvChunkAck, // event that received the ack for a chunk of N
+ // accumulative data msgs (N>=1)
+ kEvtSendSelectiveAck, // event to send the ack for a number of
selective
+ // data msgs (not supported)
+ kEvtRcvSelectiveAck, // event that received the ack for a
numer of
+ // selective data msgs (not supported)
+ kEvtDropData, // event reported from tipc that a
message is not
+ // delivered
+ };
+ NCS_IPC_MSG next_{0};
+ Type type_;
+
+ // Used for flow event only
+ struct tipc_portid id_;
+ uint16_t svc_id_{0};
+ uint32_t mseq_{0};
+ uint16_t mfrag_{0};
+ uint16_t fseq_{0};
+ uint16_t chunk_size_{1};
+ explicit Event(Type type):type_(type) {}
+ Event(Type type, struct tipc_portid id, uint16_t svc_id,
+ uint32_t mseq, uint16_t mfrag, uint16_t f_seg_num):
+ id_(id), svc_id_(svc_id),
+ mseq_(mseq), mfrag_(mfrag), fseq_(f_seg_num) {
+ type_ = type;
+ }
+ Event(Type type, struct tipc_portid id, uint16_t svc_id, uint32_t
mseq,
+ uint16_t mfrag, uint16_t f_seg_num, uint16_t chunk_size):
+ id_(id), svc_id_(svc_id), mseq_(mseq), mfrag_(mfrag),
+ fseq_(f_seg_num), chunk_size_(chunk_size) {
+ type_ = type;
+ }
+};
+
+class BaseMessage {
+ public:
+ virtual ~BaseMessage() {}
+ virtual void Decode(uint8_t *msg) {}
+ virtual void Encode(uint8_t *msg) {}
+};
+
+class HeaderMessage: public BaseMessage {
+ public:
+ uint8_t* msg_ptr_{nullptr};
+ uint16_t msg_len_{0};
+ uint32_t mseq_{0};
+ uint16_t mfrag_{0};
+ uint16_t fseq_{0};
+ uint8_t pro_ver_{0};
+ uint32_t pro_id_{0};
+ uint16_t msg_type_{0};
+ HeaderMessage() {}
+ HeaderMessage(uint16_t msg_len, uint32_t mseq, uint16_t mfrag,
+ uint16_t fseq);
+ virtual ~HeaderMessage() {}
+ void Decode(uint8_t *msg) override;
+ void Encode(uint8_t *msg) override;
+};
+
+class DataMessage: public BaseMessage {
+ public:
+ HeaderMessage header_;
+ uint16_t svc_id_{0};
+
+ uint8_t* msg_data_{nullptr};
+ uint8_t snd_type_{0};
+
+ DataMessage() {}
+ virtual ~DataMessage();
+ void Decode(uint8_t *msg) override;
+};
+
+class ChunkAck: public BaseMessage {
+ public:
+ static const uint8_t kChunkAckMsgType = 1;
+ static const uint16_t kChunkAckMsgLength = 22;
+
+ uint8_t msg_type_{0};
+ uint16_t svc_id_{0};
+ uint16_t acked_fseq_{0};
+ uint16_t chunk_size_{1};
+ ChunkAck() {}
+ ChunkAck(uint16_t svc_id, uint16_t fseq, uint16_t chunk_size);
+ virtual ~ChunkAck() {}
+ void Encode(uint8_t *msg) override;
+ void Decode(uint8_t *msg) override;
+};
+
+} // end namespace mds
+
+#endif // MDS_MDS_TIPC_FCTRL_MSG_H_
diff --git a/src/mds/mds_tipc_fctrl_portid.cc
b/src/mds/mds_tipc_fctrl_portid.cc
new file mode 100644
index 0000000..24d13ee
--- /dev/null
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -0,0 +1,261 @@
+/* -*- OpenSAF -*-
+ *
+ * (C) Copyright 2019 The OpenSAF Foundation
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are
licensed
+ * under the GNU Lesser General Public License Version 2.1,
February 1999.
+ * The complete license can be accessed from the following location:
+ *
https://protect2.fireeye.com/url?k=0f27255d-53ae8a36-0f2765c6-0cc47ad93ea2-763cb574242dc0e2&q=1&u=http%3A%2F%2Fopensource.org%2Flicenses%2Flgpl-license.php
+ * See the Copying file included with the OpenSAF distribution for
full
+ * licensing terms.
+ *
+ * Author(s): Ericsson AB
+ *
+ */
+
+#include "mds/mds_tipc_fctrl_portid.h"
+#include "base/ncssysf_def.h"
+
+#include "mds/mds_dt.h"
+#include "mds/mds_log.h"
+
+namespace mds {
+
+void MessageQueue::Queue(DataMessage* msg) {
+ queue_.push_back(msg);
+}
+
+DataMessage* MessageQueue::Find(uint32_t mseq, uint16_t mfrag) {
+ for (auto it = queue_.begin(); it != queue_.end(); ++it) {
+ DataMessage *m = *it;
+ if (m->header_.mseq_ == mseq && m->header_.mfrag_ == mfrag) {
+ return m;
+ }
+ }
+ return nullptr;
+}
+
+uint64_t MessageQueue::Erase(uint16_t fseq_from, uint16_t fseq_to) {
+ uint64_t msg_len = 0;
+ for (auto it = queue_.begin(); it != queue_.end();) {
+ DataMessage *m = *it;
+ if (fseq_from <= m->header_.fseq_ &&
+ m->header_.fseq_ <= fseq_to) {
+ msg_len += m->header_.msg_len_;
+ it = queue_.erase(it);
+ delete m;
+ } else {
+ it++;
+ }
+ }
+ return msg_len;
+}
+
+void MessageQueue::Clear() {
+ while (queue_.empty() == false) {
+ DataMessage* msg = queue_.front();
+ queue_.pop_front();
+ delete msg;
+ }
+}
+
+TipcPortId::TipcPortId(struct tipc_portid id, int sock, uint16_t
chksize,
+ uint64_t sock_buf_size):
+ id_(id), bsrsock_(sock), chunk_size_(chksize),
rcv_buf_size_(sock_buf_size) {
+}
+
+TipcPortId::~TipcPortId() {
+ // clear all msg in sndqueue_
+ sndqueue_.Clear();
+}
+
+uint64_t TipcPortId::GetUniqueId(struct tipc_portid id) {
+ // this uid is equivalent to the mds adest
+ uint64_t uid = ((uint64_t)id.node << 32) | (uint64_t)id.ref;
+ return uid;
+}
+
+uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) {
+ struct sockaddr_tipc server_addr;
+ ssize_t send_len = 0;
+ uint32_t rc = NCSCC_RC_SUCCESS;
+
+ memset(&server_addr, 0, sizeof(server_addr));
+ server_addr.family = AF_TIPC;
+ server_addr.addrtype = TIPC_ADDR_ID;
+ server_addr.addr.id = id_;
+ send_len = sendto(bsrsock_, data, length, 0,
+ (struct sockaddr *)&server_addr, sizeof(server_addr));
+
+ if (send_len == length) {
+ rc = NCSCC_RC_SUCCESS;
+ } else {
+ m_MDS_LOG_ERR("sendto() err :%s", strerror(errno));
+ rc = NCSCC_RC_FAILURE;
+ }
+ return rc;
+}
+
+uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length) {
+ uint32_t rc = NCSCC_RC_SUCCESS;
+
+ DataMessage *msg = new DataMessage;
+ msg->header_.Decode(const_cast<uint8_t*>(data));
+ msg->Decode(const_cast<uint8_t*>(data));
+ msg->msg_data_ = new uint8_t[length];
+ memcpy(msg->msg_data_, data, length);
+ sndqueue_.Queue(msg);
+ ++sndwnd_.send_;
+ sndwnd_.nacked_space_ += length;
+ m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+ "SndData[mseq:%u, mfrag:%u, fseq:%u, len:%u], "
+ "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
+ id_.node, id_.ref,
+ msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_,
length,
+ sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+
+ return rc;
+}
+
+bool TipcPortId::ReceiveCapable(uint16_t sending_len) {
+ return true;
+}
+
+void TipcPortId::SendChunkAck(uint16_t fseq, uint16_t svc_id,
+ uint16_t chksize) {
+ uint8_t data[ChunkAck::kChunkAckMsgLength];
+
+ HeaderMessage header(ChunkAck::kChunkAckMsgLength, 0, 0, fseq);
+ header.Encode(reinterpret_cast<uint8_t*>(&data));
+
+ ChunkAck sack(svc_id, fseq, chksize);
+ sack.Encode(reinterpret_cast<uint8_t*>(&data));
+ Send(reinterpret_cast<uint8_t*>(&data),
ChunkAck::kChunkAckMsgLength);
+ m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+ "SndChkAck[fseq:%u, chunk:%u]",
+ id_.node, id_.ref,
+ fseq, chksize);
+}
+
+uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
+ uint16_t fseq, uint16_t svc_id) {
+ uint32_t rc = NCSCC_RC_SUCCESS;
+ // update receiver sequence window
+ if (rcvwnd_.acked_ < fseq && rcvwnd_.rcv_ + 1 == fseq) {
+ m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
+ "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
+ "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "]",
+ id_.node, id_.ref,
+ mseq, mfrag, fseq,
+ rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+
+ ++rcvwnd_.rcv_;
+ if (rcvwnd_.rcv_ - rcvwnd_.acked_ >= chunk_size_) {
+ // send ack for @chunk_size_ msgs starting from fseq
+ SendChunkAck(fseq, svc_id, chunk_size_);
+ rcvwnd_.acked_ = rcvwnd_.rcv_;
+ }
+ } else {
+ // todo: update rcvwnd_.nacked_space_.
+ // This nacked_space_ will tell the number of bytes that has
not been acked
+ // to the sender. If this nacked_space_ is growing large, and
approaching
+ // the socket buffer size, the transmission of sender may be
queued.
+ // this nacked_space_ can be used to detect if the
kChunkAckTimeout or
+ // kChunkAckSize are set excessively large.
+ // It is not used for now, so ignore it.
+
+ // check for transmission error
+ if (rcvwnd_.rcv_ + 1 < fseq) {
+ if (rcvwnd_.rcv_ == 0 && rcvwnd_.acked_ == 0) {
+ // peer does not realize that this portid reset
+ m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
+ "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
+ "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], "
+ "Warning[portid reset]",
+ id_.node, id_.ref,
+ mseq, mfrag, fseq,
+ rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+
+ rcvwnd_.rcv_ = fseq;
+ } else {
+ rc = NCSCC_RC_FAILURE;
+ // msg loss
+ m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
+ "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
+ "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], "
+ "Error[msg loss]",
+ id_.node, id_.ref,
+ mseq, mfrag, fseq,
+ rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+ }
+ }
+ if (fseq <= rcvwnd_.acked_) {
+ rc = NCSCC_RC_FAILURE;
+ // unexpected retransmission
+ m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
+ "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
+ "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], "
+ "Error[unexpected retransmission]",
+ id_.node, id_.ref,
+ mseq, mfrag, fseq,
+ rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+ }
+ }
+ return rc;
+}
+
+void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) {
+ // update sender sequence window
+ if (sndwnd_.acked_ < fseq && fseq < sndwnd_.send_) {
+ m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
+ "RcvChkAck[fseq:%u, chunk:%u], "
+ "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], "
+ "queue[size:%" PRIu64 "]",
+ id_.node, id_.ref,
+ fseq, chksize,
+ sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_,
+ sndqueue_.Size());
+
+ // fast forward the sndwnd_.acked_ sequence to fseq
+ sndwnd_.acked_ = fseq;
+
+ // remove a number @chksize messages out of sndqueue_ and decrease
+ // the nacked_space_ of sender
+ sndwnd_.nacked_space_ -= sndqueue_.Erase(fseq - chksize + 1,
fseq);
+ } else {
+ m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
+ "RcvChkAck[fseq:%u, chunk:%u], "
+ "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], "
+ "queue[size:%" PRIu64 "], "
+ "Error[msg disordered]",
+ id_.node, id_.ref,
+ fseq, chksize,
+ sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_,
+ sndqueue_.Size());
+ }
+}
+
+void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
+ uint16_t fseq) {
+ DataMessage* msg = sndqueue_.Find(mseq, mfrag);
+ if (msg != nullptr) {
+ // Resend the msg found
+ Send(msg->msg_data_, msg->header_.msg_len_);
+ m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+ "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
+ "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
+ id_.node, id_.ref,
+ mseq, mfrag, fseq,
+ sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+ } else {
+ m_MDS_LOG_ERR("FCTRL: [me] --> [node:%x, ref:%u], "
+ "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
+ "Error[msg not found]",
+ id_.node, id_.ref,
+ mseq, mfrag, fseq);
+ }
+}
+
+} // end namespace mds
diff --git a/src/mds/mds_tipc_fctrl_portid.h
b/src/mds/mds_tipc_fctrl_portid.h
new file mode 100644
index 0000000..8068e6e
--- /dev/null
+++ b/src/mds/mds_tipc_fctrl_portid.h
@@ -0,0 +1,87 @@
+/* -*- OpenSAF -*-
+ *
+ * (C) Copyright 2019 The OpenSAF Foundation
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are
licensed
+ * under the GNU Lesser General Public License Version 2.1,
February 1999.
+ * The complete license can be accessed from the following location:
+ *
https://protect2.fireeye.com/url?k=226846f8-7ee1e993-22680663-0cc47ad93ea2-e9b41ced4c57c419&q=1&u=http%3A%2F%2Fopensource.org%2Flicenses%2Flgpl-license.php
+ * See the Copying file included with the OpenSAF distribution for
full
+ * licensing terms.
+ *
+ * Author(s): Ericsson AB
+ *
+ */
+
+#ifndef MDS_MDS_TIPC_FCTRL_PORTID_H_
+#define MDS_MDS_TIPC_FCTRL_PORTID_H_
+
+#include <linux/tipc.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <deque>
+#include "mds/mds_tipc_fctrl_msg.h"
+
+namespace mds {
+
+class MessageQueue {
+ public:
+ void Queue(DataMessage* msg);
+ DataMessage* Find(uint32_t mseq, uint16_t mfrag);
+ uint64_t Erase(uint16_t fseq_from, uint16_t fseq_to);
+ uint64_t Size() const { return queue_.size(); }
+ void Clear();
+ private:
+ std::deque<DataMessage*> queue_;
+};
+
+class TipcPortId {
+ public:
+ TipcPortId(struct tipc_portid id, int sock, uint16_t chunk_size,
+ uint64_t sock_buf_size);
+ ~TipcPortId();
+ static uint64_t GetUniqueId(struct tipc_portid id);
+ int GetSock() const { return bsrsock_; }
+ uint16_t GetCurrentSeq() { return sndwnd_.send_; }
+ bool ReceiveCapable(uint16_t sending_len);
+ void ReceiveChunkAck(uint16_t fseq, uint16_t chunk_size);
+ void SendChunkAck(uint16_t fseq, uint16_t svc_id, uint16_t
chunk_size);
+ uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag,
+ uint16_t fseq, uint16_t svc_id);
+ void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq);
+ uint32_t Send(uint8_t* data, uint16_t length);
+ uint32_t Queue(const uint8_t* data, uint16_t length);
+
+ uint16_t svc_cnt_{1}; // number of service subscribed on this
portid
+
+ private:
+ struct tipc_portid id_;
+ int bsrsock_; // tipc socket to send/receive data per tipc_portid
+ uint16_t chunk_size_{5};
+ uint64_t rcv_buf_size_{0}; // estimated buffer size at receiver
+
+ struct sndwnd {
+ // sender sequence window
+ uint16_t acked_{0}; // last sequence has been acked by receiver
+ uint16_t send_{1}; // next sequence to be sent
+ uint64_t nacked_space_{0}; // total bytes are sent but not acked
+ };
+ struct sndwnd sndwnd_;
+
+ struct rcvwnd {
+ // receiver sequence window
+ uint16_t acked_{0}; // last sequence has been acked to sender
+ uint16_t rcv_{0}; // last sequence has been received
+ uint64_t nacked_space_{0}; // total bytes has not been acked
+ };
+ struct rcvwnd rcvwnd_;
+
+ MessageQueue sndqueue_;
+};
+
+} // end namespace mds
+#endif // MDS_MDS_TIPC_FCTRL_PORTID_H_