In the scenario of recovery from split-brain, where both
active director services may suffer mds message loss due
to lost-contact tipc link. If MDS_TIPC_FCTRL_ENABLED is
set, the out-of-order message will be dropped, and there
is no mechanism to trigger the retransmission from receiver
side at this moment (the retransmission is only triggered
from sender as result of TIPC_ERR_OVERLOAD).

In reception of disordered message, the receiver can send
not-acknowledgement to notify the sender for retransmission.
Therefore, the sender can trigger retransmisison in the same
way as receiving TIPC_ERR_OVERLOAD.

This patch adds Nack message for retransmission of disordered
message detected from receiver side, and adds a missing call
to portid_map_mutex.unlock() in process_all_events().
---
 src/mds/mds_c_api.c              |  2 +-
 src/mds/mds_dt_common.c          |  2 +-
 src/mds/mds_tipc_fctrl_intf.cc   | 25 ++++++++++++++++++++++---
 src/mds/mds_tipc_fctrl_msg.cc    | 35 ++++++++++++++++++++++++++++++++++-
 src/mds/mds_tipc_fctrl_msg.h     | 22 ++++++++++++++++++++++
 src/mds/mds_tipc_fctrl_portid.cc | 32 ++++++++++++++++++++++++--------
 src/mds/mds_tipc_fctrl_portid.h  |  3 ++-
 7 files changed, 106 insertions(+), 15 deletions(-)

diff --git a/src/mds/mds_c_api.c b/src/mds/mds_c_api.c
index c41c8dd..132555b 100644
--- a/src/mds/mds_c_api.c
+++ b/src/mds/mds_c_api.c
@@ -4196,7 +4196,7 @@ void mds_mcm_msg_loss(MDS_SVC_HDL local_svc_hdl, MDS_DEST 
rem_adest,
 
        /* Check whether the msg loss is enabled or not */
        if (true != local_svc_info->i_msg_loss_indication) {
-               m_MDS_LOG_INFO(" MSG loss not enbaled mds_mcm_msg_loss\n");
+               m_MDS_LOG_NOTIFY("MSG loss is not enabled mds_mcm_msg_loss\n");
                return;
        }
 
diff --git a/src/mds/mds_dt_common.c b/src/mds/mds_dt_common.c
index 66652af..de13883 100644
--- a/src/mds/mds_dt_common.c
+++ b/src/mds/mds_dt_common.c
@@ -972,7 +972,7 @@ uint32_t mds_tmr_mailbox_processing(void)
                                        .vdest_id);
                                break;
                        case MDS_REASSEMBLY_TMR:
-                               m_MDS_LOG_DBG(
+                               m_MDS_LOG_ERR(
                                    "MDTM: Tmr Mailbox Processing:Reassemble 
Tmr Hdl=0x%08x",
                                    mbx_evt_info->info.tmr_info_hdl);
                                mdtm_process_reassem_timer_event(
diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc
index 2366672..8018064 100644
--- a/src/mds/mds_tipc_fctrl_intf.cc
+++ b/src/mds/mds_tipc_fctrl_intf.cc
@@ -38,6 +38,7 @@ using mds::Timer;
 using mds::DataMessage;
 using mds::ChunkAck;
 using mds::HeaderMessage;
+using mds::Nack;
 
 namespace {
 // flow control enabled/disabled
@@ -142,7 +143,8 @@ uint32_t process_flow_event(const Event& evt) {
     if (evt.type_ == Event::Type::kEvtSendChunkAck) {
       portid->SendChunkAck(evt.fseq_, evt.svc_id_, evt.chunk_size_);
     }
-    if (evt.type_ == Event::Type::kEvtDropData) {
+    if (evt.type_ == Event::Type::kEvtDropData ||
+        evt.type_ == Event::Type::kEvtRcvNack) {
       portid->ReceiveNack(evt.mseq_, evt.mfrag_,
           evt.fseq_);
     }
@@ -178,8 +180,10 @@ uint32_t process_all_events(void) {
         Event *evt = reinterpret_cast<Event*>(ncs_ipc_non_blk_recv(
             &mbx_events));
 
-        if (evt == nullptr) continue;
-
+        if (evt == nullptr) {
+          portid_map_mutex.unlock();
+          continue;
+        }
         if (evt->IsTimerEvent()) {
           process_timer_event(*evt);
         }
@@ -464,6 +468,21 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t 
len,
         // skip this data msg
         return NCSCC_RC_FAILURE;
       }
+      if (header.msg_type_ == Nack::kNackMsgType) {
+        // receive nack message
+        Nack nack;
+        nack.Decode(buffer);
+        // send to the event thread
+        if (m_NCS_IPC_SEND(&mbx_events,
+            new Event(Event::Type::kEvtRcvNack, id, nack.svc_id_,
+                header.mseq_, header.mfrag_, nack.nacked_fseq_),
+                NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
+          m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n");
+        }
+        // return NCSCC_RC_FAILURE, so the tipc receiving thread (legacy) will
+        // skip this data msg
+        return NCSCC_RC_FAILURE;
+      }
     } else {
       // receive data message
       DataMessage data;
diff --git a/src/mds/mds_tipc_fctrl_msg.cc b/src/mds/mds_tipc_fctrl_msg.cc
index 064d977..0246b65 100644
--- a/src/mds/mds_tipc_fctrl_msg.cc
+++ b/src/mds/mds_tipc_fctrl_msg.cc
@@ -96,7 +96,7 @@ void DataMessage::Decode(uint8_t *msg) {
 
 DataMessage::~DataMessage() {
   if (msg_data_ != nullptr) {
-    delete msg_data_;
+    delete[] msg_data_;
     msg_data_ = nullptr;
   }
 }
@@ -139,4 +139,37 @@ void ChunkAck::Decode(uint8_t *msg) {
   chunk_size_ = ncs_decode_16bit(&ptr);
 }
 
+
+Nack::Nack(uint16_t svc_id, uint16_t fseq):
+    svc_id_(svc_id), nacked_fseq_(fseq) {
+  msg_type_ = kNackMsgType;
+}
+
+void Nack::Encode(uint8_t *msg) {
+  uint8_t *ptr;
+  // encode protocol identifier
+  ptr = &msg[Nack::FieldIndex::kProtocolIdentifier];
+  ncs_encode_32bit(&ptr, MDS_PROT_FCTRL_ID);
+  // encode message type
+  ptr = &msg[Nack::FieldIndex::kFlowControlMessageType];
+  ncs_encode_8bit(&ptr, kNackMsgType);
+  // encode service id
+  ptr = &msg[Nack::FieldIndex::kServiceId];
+  ncs_encode_16bit(&ptr, svc_id_);
+  // encode flow control sequence number
+  ptr = &msg[Nack::FieldIndex::kFlowControlSequenceNumber];
+  ncs_encode_16bit(&ptr, nacked_fseq_);
+}
+
+void Nack::Decode(uint8_t *msg) {
+  uint8_t *ptr;
+
+  // decode service id
+  ptr = &msg[Nack::FieldIndex::kServiceId];
+  svc_id_ = ncs_decode_16bit(&ptr);
+  // decode flow control sequence number
+  ptr = &msg[Nack::FieldIndex::kFlowControlSequenceNumber];
+  nacked_fseq_ = ncs_decode_16bit(&ptr);
+}
+
 }  // end namespace mds
diff --git a/src/mds/mds_tipc_fctrl_msg.h b/src/mds/mds_tipc_fctrl_msg.h
index d67ed19..e1db200 100644
--- a/src/mds/mds_tipc_fctrl_msg.h
+++ b/src/mds/mds_tipc_fctrl_msg.h
@@ -44,6 +44,7 @@ class Event {
                            // selective data msgs (not supported)
     kEvtDropData,          // event reported from tipc that a message is not
                            // delivered
+    kEvtRcvNack,           // event that received nack message
     kEvtTmrAll,
     kEvtTmrTxProb,    // event that tx probation timer expired for once
     kEvtTmrChunkAck,  // event to send the chunk ack
@@ -150,6 +151,27 @@ class ChunkAck: public BaseMessage {
   void Decode(uint8_t *msg) override;
 };
 
+class Nack: public BaseMessage {
+ public:
+  enum FieldIndex {
+    kProtocolIdentifier = 11,
+    kFlowControlMessageType = 15,
+    kServiceId = 16,
+    kFlowControlSequenceNumber = 18,
+  };
+  static const uint8_t kNackMsgType = 2;
+  static const uint16_t kNackMsgLength = 20;
+
+  uint8_t msg_type_{0};
+  uint16_t svc_id_{0};
+  uint16_t nacked_fseq_{0};
+  Nack() {}
+  Nack(uint16_t svc_id, uint16_t fseq);
+  virtual ~Nack() {}
+  void Encode(uint8_t *msg) override;
+  void Decode(uint8_t *msg) override;
+};
+
 }  // end namespace mds
 
 #endif  // MDS_MDS_TIPC_FCTRL_MSG_H_
diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 1ce792d..8235265 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -57,10 +57,10 @@ void MessageQueue::Queue(DataMessage* msg) {
   queue_.push_back(msg);
 }
 
-DataMessage* MessageQueue::Find(uint32_t mseq, uint16_t mfrag) {
+DataMessage* MessageQueue::Find(Seq16 fseq) {
   for (const auto& it : queue_) {
     DataMessage *m = it;
-    if (m->header_.mseq_ == mseq && m->header_.mfrag_ == mfrag) {
+    if (Seq16(m->header_.fseq_) == fseq) {
       return m;
     }
   }
@@ -224,20 +224,34 @@ bool TipcPortId::ReceiveCapable(uint16_t sending_len) {
 
 void TipcPortId::SendChunkAck(uint16_t fseq, uint16_t svc_id,
     uint16_t chksize) {
-  uint8_t data[ChunkAck::kChunkAckMsgLength];
+  uint8_t data[ChunkAck::kChunkAckMsgLength] = {0};
 
   HeaderMessage header(ChunkAck::kChunkAckMsgLength, 0, 0, fseq);
-  header.Encode(reinterpret_cast<uint8_t*>(&data));
+  header.Encode(data);
 
   ChunkAck sack(svc_id, fseq, chksize);
-  sack.Encode(reinterpret_cast<uint8_t*>(&data));
-  Send(reinterpret_cast<uint8_t*>(&data), ChunkAck::kChunkAckMsgLength);
+  sack.Encode(data);
+  Send(data, ChunkAck::kChunkAckMsgLength);
   m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
       "SndChkAck[fseq:%u, chunk:%u]",
       id_.node, id_.ref,
       fseq, chksize);
 }
 
+void TipcPortId::SendNack(uint16_t fseq, uint16_t svc_id) {
+  uint8_t data[Nack::kNackMsgLength] = {0};
+
+  HeaderMessage header(Nack::kNackMsgLength, 0, 0, fseq);
+  header.Encode(data);
+
+  Nack nack(svc_id, fseq);
+  nack.Encode(data);
+  Send(data, Nack::kNackMsgLength);
+  m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+      "SndNack[fseq:%u]", id_.node, id_.ref, fseq);
+}
+
+
 uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
     uint16_t fseq, uint16_t svc_id) {
   uint32_t rc = NCSCC_RC_SUCCESS;
@@ -312,9 +326,11 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
             id_.node, id_.ref,
             mseq, mfrag, fseq,
             rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
+        // send nack
+        SendNack((rcvwnd_.rcv_ + Seq16(1)).v(), svc_id);
       }
     }
-    if (Seq16(fseq) <= rcvwnd_.acked_) {
+    if (Seq16(fseq) <= rcvwnd_.rcv_) {
       rc = NCSCC_RC_FAILURE;
       // unexpected retransmission
       m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
@@ -438,7 +454,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
         id_.node, id_.ref);
     sndqueue_.MarkUnsentFrom(Seq16(fseq));
   }
-  DataMessage* msg = sndqueue_.Find(mseq, mfrag);
+  DataMessage* msg = sndqueue_.Find(Seq16(fseq));
   if (msg != nullptr) {
     // Resend the msg found
     if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
diff --git a/src/mds/mds_tipc_fctrl_portid.h b/src/mds/mds_tipc_fctrl_portid.h
index cf2daaa..d238ac6 100644
--- a/src/mds/mds_tipc_fctrl_portid.h
+++ b/src/mds/mds_tipc_fctrl_portid.h
@@ -94,7 +94,7 @@ class Seq16 {
 class MessageQueue {
  public:
   void Queue(DataMessage* msg);
-  DataMessage* Find(uint32_t mseq, uint16_t mfrag);
+  DataMessage* Find(Seq16 fseq);
   uint64_t Erase(Seq16 fseq_from, Seq16 fseq_to);
   uint64_t Size() const { return queue_.size(); }
   void Clear();
@@ -133,6 +133,7 @@ class TipcPortId {
   bool ReceiveCapable(uint16_t sending_len);
   void ReceiveChunkAck(uint16_t fseq, uint16_t chunk_size);
   void SendChunkAck(uint16_t fseq, uint16_t svc_id, uint16_t chunk_size);
+  void SendNack(uint16_t fseq, uint16_t svc_id);
   uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag,
       uint16_t fseq, uint16_t svc_id);
   void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq);
-- 
2.7.4



_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to