The patch avoids message reallocation if the message is in retransmission queue --- src/mds/mds_dt_tipc.c | 68 +++++++++++++++++++++++----------------- src/mds/mds_tipc_fctrl_intf.cc | 6 ++-- src/mds/mds_tipc_fctrl_intf.h | 4 +-- src/mds/mds_tipc_fctrl_msg.cc | 2 +- src/mds/mds_tipc_fctrl_portid.cc | 9 ++---- 5 files changed, 50 insertions(+), 39 deletions(-)
diff --git a/src/mds/mds_dt_tipc.c b/src/mds/mds_dt_tipc.c index 722076f..3d4f468 100644 --- a/src/mds/mds_dt_tipc.c +++ b/src/mds/mds_dt_tipc.c @@ -120,7 +120,7 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req); /* Tipc actual send, can be made as Macro even*/ static uint32_t mdtm_sendto(uint8_t *buffer, uint16_t buff_len, - struct tipc_portid tipc_id); + struct tipc_portid tipc_id, uint8_t *is_queued); static uint32_t mdtm_mcast_sendto(void *buffer, size_t size, const MDTM_SEND_REQ *req); @@ -2643,7 +2643,8 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req) if (req->snd_type == MDS_SENDTYPE_ACK || req->snd_type == MDS_SENDTYPE_RACK) { uint8_t len = mds_and_mdtm_hdr_len; - uint8_t buffer_ack[len]; + uint8_t *buffer_ack = calloc(1, len); + uint8_t is_queued = 0; /* Add mds_hdr */ if (mdtm_add_mds_hdr(buffer_ack, req) @@ -2657,18 +2658,24 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req) &fctrl_seq_num) == NCSCC_RC_FAILURE){ m_MDS_LOG_ERR("FCTRL: Failed to send message" " len :%d", len); + free(buffer_ack); return NCSCC_RC_FAILURE; } /* Add frag_hdr */ if (mdtm_add_frag_hdr(buffer_ack, len, frag_seq_num, 0, fctrl_seq_num) != NCSCC_RC_SUCCESS) { + free(buffer_ack); return NCSCC_RC_FAILURE; } m_MDS_LOG_DBG("MDTM:Sending message with Service" " Seqno=%d, TO Dest_Tipc_id=<0x%08x:%u> ", req->svc_seq_num, tipc_id.node, tipc_id.ref); - return mdtm_sendto(buffer_ack, len, tipc_id); + status = mdtm_sendto(buffer_ack, len, tipc_id, + &is_queued); + if (is_queued == 0) + free(buffer_ack); + return status; } if (req->msg.encoding == MDS_ENC_TYPE_FLAT) { @@ -2730,6 +2737,7 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req) } else { uint8_t *p8; uint8_t *body = NULL; + uint8_t is_queued = 0; body = calloc(1, len + mds_and_mdtm_hdr_len); @@ -2806,8 +2814,8 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req) req->dest_svc_id); return NCSCC_RC_FAILURE; } - if (mdtm_mcast_sendto(body, len, req) - != NCSCC_RC_SUCCESS) { + status = mdtm_mcast_sendto(body, len, req); + if (status != NCSCC_RC_SUCCESS) { m_MDS_LOG_ERR("MDTM: Failed to" " send Multicast" " message Data lenght=%d" @@ -2819,24 +2827,20 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req) get_svc_names(req->dest_svc_id), req->dest_svc_id, strerror(errno)); - m_MMGR_FREE_BUFR_LIST(usrbuf); - free(body); - return NCSCC_RC_FAILURE; } } else { - if (mdtm_sendto(body, len, tipc_id) - != NCSCC_RC_SUCCESS) { + status = mdtm_sendto(body, len, + tipc_id, &is_queued); + if (status != NCSCC_RC_SUCCESS) { m_MDS_LOG_ERR("MDTM: Unable to" " send the msg thru" " TIPC\n"); - m_MMGR_FREE_BUFR_LIST(usrbuf); - free(body); - return NCSCC_RC_FAILURE; } } m_MMGR_FREE_BUFR_LIST(usrbuf); - free(body); - return NCSCC_RC_SUCCESS; + if (is_queued == 0) + free(body); + return status; } } break; @@ -2861,6 +2865,7 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req) req->dest_svc_id); uint8_t *body = NULL; + uint8_t is_queued = 0; body = calloc(1, (req->msg.data.buff_info.len + mds_and_mdtm_hdr_len)); @@ -2903,13 +2908,16 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req) req->msg.data.buff_info.buff, req->msg.data.buff_info.len); - if (mdtm_sendto(body, + status = mdtm_sendto(body, (req->msg.data.buff_info.len + - mds_and_mdtm_hdr_len), tipc_id) - != NCSCC_RC_SUCCESS) { + mds_and_mdtm_hdr_len), tipc_id, &is_queued); + + if (is_queued == 0) + free(body); + + if (status != NCSCC_RC_SUCCESS) { m_MDS_LOG_ERR("MDTM: Unable to send the msg" " thru TIPC\n"); - free(body); mds_free_direct_buff( req->msg.data.buff_info.buff); return NCSCC_RC_FAILURE; @@ -2920,13 +2928,12 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req) */ if (req->snd_type == MDS_SENDTYPE_BCAST || req->snd_type == MDS_SENDTYPE_RBCAST) { - /* Dont free Here */ + /* Dont free Here, WHY? */ } else { mds_free_direct_buff( req->msg.data.buff_info.buff); } - free(body); - return NCSCC_RC_SUCCESS; + return status; } break; default: @@ -3031,11 +3038,13 @@ uint32_t mdtm_frag_and_send(MDTM_SEND_REQ *req, uint32_t seq_num, uint32_t hdr_plus = (i == 1) ? mds_and_mdtm_hdr_len : MDTM_FRAG_HDR_PLUS_LEN_2; uint8_t *body = NULL; + uint8_t is_queued = 0; body = calloc(1, len_buf); p8 = (uint8_t *)m_MMGR_DATA_AT_START(usrbuf, len_buf - hdr_plus, (char *)(body + hdr_plus)); + if (p8 != (body + hdr_plus)) memcpy((body + hdr_plus), p8, len_buf - hdr_plus); if (i == 1) { @@ -3087,16 +3096,17 @@ uint32_t mdtm_frag_and_send(MDTM_SEND_REQ *req, uint32_t seq_num, " TO Dest_Tipc_id=<0x%08x:%u>", req->svc_seq_num, seq_num, frag_val, id.node, id.ref); - ret = mdtm_sendto(body, len_buf, id); + ret = mdtm_sendto(body, len_buf, id, &is_queued); } + + if (is_queued == 0) + free(body); if (ret != NCSCC_RC_SUCCESS) { /* Failed to send a fragmented msg, stop sending */ m_MMGR_FREE_BUFR_LIST(usrbuf); - free(body); break; } m_MMGR_REMOVE_FROM_START(&usrbuf, len_buf - hdr_plus); - free(body); len = len - (len_buf - hdr_plus); if (len == 0) break; @@ -3171,7 +3181,7 @@ uint32_t mdtm_add_frag_hdr(uint8_t *buf_ptr, uint16_t len, uint32_t seq_num, *********************************************************/ static uint32_t mdtm_sendto(uint8_t *buffer, uint16_t buff_len, - struct tipc_portid id) + struct tipc_portid id, uint8_t *is_queued) { /* Can be made as macro even */ struct sockaddr_tipc server_addr; @@ -3197,9 +3207,11 @@ static uint32_t mdtm_sendto(uint8_t *buffer, uint16_t buff_len, buffer[4] = checksum; } #endif - if (mds_tipc_fctrl_trysend(buffer, buff_len, id) == NCSCC_RC_SUCCESS) { + + if (mds_tipc_fctrl_trysend(id, buffer, buff_len, is_queued) + == NCSCC_RC_SUCCESS) { send_len = sendto(tipc_cb.BSRsock, buffer, buff_len, 0, - (struct sockaddr *)&server_addr, sizeof(server_addr)); + (struct sockaddr *)&server_addr, sizeof(server_addr)); if (send_len == buff_len) { m_MDS_LOG_INFO("MDTM: Successfully sent message"); return NCSCC_RC_SUCCESS; diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc index 0e3230a..7d0571e 100644 --- a/src/mds/mds_tipc_fctrl_intf.cc +++ b/src/mds/mds_tipc_fctrl_intf.cc @@ -339,8 +339,9 @@ uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id, return rc; } -uint32_t mds_tipc_fctrl_trysend(const uint8_t *buffer, uint16_t len, - struct tipc_portid id) { +uint32_t mds_tipc_fctrl_trysend(struct tipc_portid id, const uint8_t *buffer, + uint16_t len, uint8_t* is_queued) { + *is_queued = 0; if (is_fctrl_enabled == false) return NCSCC_RC_SUCCESS; uint32_t rc = NCSCC_RC_SUCCESS; @@ -357,6 +358,7 @@ uint32_t mds_tipc_fctrl_trysend(const uint8_t *buffer, uint16_t len, if (portid->state_ != TipcPortId::State::kDisabled) { bool sendable = portid->ReceiveCapable(len); portid->Queue(buffer, len, sendable); + *is_queued = 1; // start txprob timer for the first msg sent out // do not start for other states if (sendable && portid->state_ == TipcPortId::State::kStartup) { diff --git a/src/mds/mds_tipc_fctrl_intf.h b/src/mds/mds_tipc_fctrl_intf.h index ed9c6a8..f9bbc2d 100644 --- a/src/mds/mds_tipc_fctrl_intf.h +++ b/src/mds/mds_tipc_fctrl_intf.h @@ -39,8 +39,8 @@ uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer, uint16_t len, struct tipc_portid id); uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id, uint16_t* next_seq); -uint32_t mds_tipc_fctrl_trysend(const uint8_t *buffer, uint16_t len, - struct tipc_portid id); +uint32_t mds_tipc_fctrl_trysend(struct tipc_portid id, const uint8_t *buffer, + uint16_t len, uint8_t* is_queued); #ifdef __cplusplus } #endif diff --git a/src/mds/mds_tipc_fctrl_msg.cc b/src/mds/mds_tipc_fctrl_msg.cc index 454c02c..0f9fd09 100644 --- a/src/mds/mds_tipc_fctrl_msg.cc +++ b/src/mds/mds_tipc_fctrl_msg.cc @@ -138,7 +138,7 @@ void DataMessage::Decode(uint8_t *msg) { DataMessage::~DataMessage() { if (msg_data_ != nullptr) { - delete[] msg_data_; + free(msg_data_); msg_data_ = nullptr; } } diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc index 316e1ba..e974543 100644 --- a/src/mds/mds_tipc_fctrl_portid.cc +++ b/src/mds/mds_tipc_fctrl_portid.cc @@ -175,13 +175,12 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length, DataMessage *msg = new DataMessage; msg->header_.Decode(const_cast<uint8_t*>(data)); msg->Decode(const_cast<uint8_t*>(data)); - msg->msg_data_ = new uint8_t[length]; msg->is_sent_ = is_sent; - memcpy(msg->msg_data_, data, length); + msg->msg_data_ = const_cast<uint8_t*>(data); sndqueue_.Queue(msg); + ++sndwnd_.send_; + sndwnd_.nacked_space_ += length; if (is_sent) { - ++sndwnd_.send_; - sndwnd_.nacked_space_ += length; m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " "SndData[mseq:%u, mfrag:%u, fseq:%u, len:%u], " "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", @@ -189,8 +188,6 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length, msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length, sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); } else { - ++sndwnd_.send_; - sndwnd_.nacked_space_ += length; m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " "QueData[mseq:%u, mfrag:%u, fseq:%u, len:%u], " "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", -- 2.7.4 _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel