mds relies on data message sent from the peer to determine
whether the MDS_TIPC_FCTRL_ENABLED is set. The data message
may not be sent right after TIPC_PUBLISHED event, which can
cause the tx probation timer timeout.

This patch add Reset message, which is sent right after the
TIPC_PUBLISHED to help mds determine the flow control supported
at the peer earlier.
---
 src/mds/mds_main.c               |  2 +-
 src/mds/mds_tipc_fctrl_intf.cc   | 27 ++++++++++++++++++++++
 src/mds/mds_tipc_fctrl_msg.cc    | 11 +++++++++
 src/mds/mds_tipc_fctrl_msg.h     | 18 +++++++++++++++
 src/mds/mds_tipc_fctrl_portid.cc | 49 ++++++++++++++++++++++++++++++----------
 src/mds/mds_tipc_fctrl_portid.h  |  2 ++
 6 files changed, 96 insertions(+), 13 deletions(-)

diff --git a/src/mds/mds_main.c b/src/mds/mds_main.c
index 8c9b1f1..c7d2f7b 100644
--- a/src/mds/mds_main.c
+++ b/src/mds/mds_main.c
@@ -408,7 +408,7 @@ uint32_t mds_lib_req(NCS_LIB_REQ_INFO *req)
                                if (tipc_mcast_enabled != false)
                                        tipc_mcast_enabled = true;
 
-                               m_MDS_LOG_DBG(
+                               m_MDS_LOG_NOTIFY(
                                    "MDS: TIPC_MCAST_ENABLED: %d  Set argument 
\n",
                                    tipc_mcast_enabled);
                        }
diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc
index 6271890..e8c9121 100644
--- a/src/mds/mds_tipc_fctrl_intf.cc
+++ b/src/mds/mds_tipc_fctrl_intf.cc
@@ -39,6 +39,7 @@ using mds::DataMessage;
 using mds::ChunkAck;
 using mds::HeaderMessage;
 using mds::Nack;
+using mds::Reset;
 
 namespace {
 // flow control enabled/disabled
@@ -124,12 +125,20 @@ uint32_t process_flow_event(const Event& evt) {
   uint32_t rc = NCSCC_RC_SUCCESS;
   TipcPortId *portid = portid_lookup(evt.id_);
   if (portid == nullptr) {
+    // the null portid normally should not happen; however because the
+    // tipc_cb.Dsock and tipc_cb.BSRsock are separated; the data message
+    // sent from BSRsock may come before reception of TIPC_PUBLISHED
     if (evt.type_ == Event::Type::kEvtRcvData) {
       portid = new TipcPortId(evt.id_, data_sock_fd,
           kChunkAckSize, sock_buf_size);
       portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid;
       rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
             evt.fseq_, evt.svc_id_);
+    } else if (evt.type_ == Event::Type::kEvtRcvReset) {
+      portid = new TipcPortId(evt.id_, data_sock_fd,
+          kChunkAckSize, sock_buf_size);
+      portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid;
+      portid->ReceiveReset();
     } else {
       m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
           "RcvEvt[evt:%d], Error[PortId not found]",
@@ -151,6 +160,9 @@ uint32_t process_flow_event(const Event& evt) {
       portid->ReceiveNack(evt.mseq_, evt.mfrag_,
           evt.fseq_);
     }
+    if (evt.type_ == Event::Type::kEvtRcvReset) {
+      portid->ReceiveReset();
+    }
   }
   return rc;
 }
@@ -489,6 +501,16 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t 
len,
           m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, Error[%s]",
               strerror(errno));
         }
+      } else if (header.msg_type_ == Reset::kResetMsgType) {
+        // no need to decode reset message
+        // the decoding reset message type is done in header decoding
+        // send to the event thread
+        if (m_NCS_IPC_SEND(&mbx_events,
+            new Event(Event::Type::kEvtRcvReset, id, 0, 0, 0, 0),
+                NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
+          m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, Error[%s]",
+              strerror(errno));
+        }
       } else {
         m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
             "[msg_type:%u], Error[not supported message type]",
@@ -516,6 +538,11 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t 
len,
       portid_map_mutex.unlock();
       return rc;
     }
+  } else {
+    m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
+        "Receive non-flow-control data message, "
+        "header.pro_ver:%u",
+        id.node, id.ref, header.pro_ver_);
   }
   return NCSCC_RC_SUCCESS;
 }
diff --git a/src/mds/mds_tipc_fctrl_msg.cc b/src/mds/mds_tipc_fctrl_msg.cc
index 932120f..4aba3fb 100644
--- a/src/mds/mds_tipc_fctrl_msg.cc
+++ b/src/mds/mds_tipc_fctrl_msg.cc
@@ -178,4 +178,15 @@ void Nack::Decode(uint8_t *msg) {
   nacked_fseq_ = ncs_decode_16bit(&ptr);
 }
 
+
+void Reset::Encode(uint8_t *msg) {
+  uint8_t *ptr;
+  // encode protocol identifier
+  ptr = &msg[Reset::FieldIndex::kProtocolIdentifier];
+  ncs_encode_32bit(&ptr, MDS_PROT_FCTRL_ID);
+  // encode message type
+  ptr = &msg[Reset::FieldIndex::kFlowControlMessageType];
+  ncs_encode_8bit(&ptr, kResetMsgType);
+}
+
 }  // end namespace mds
diff --git a/src/mds/mds_tipc_fctrl_msg.h b/src/mds/mds_tipc_fctrl_msg.h
index e1db200..e94fb9d 100644
--- a/src/mds/mds_tipc_fctrl_msg.h
+++ b/src/mds/mds_tipc_fctrl_msg.h
@@ -45,6 +45,7 @@ class Event {
     kEvtDropData,          // event reported from tipc that a message is not
                            // delivered
     kEvtRcvNack,           // event that received nack message
+    kEvtRcvReset,          // event that received reset message
     kEvtTmrAll,
     kEvtTmrTxProb,    // event that tx probation timer expired for once
     kEvtTmrChunkAck,  // event to send the chunk ack
@@ -172,6 +173,23 @@ class Nack: public BaseMessage {
   void Decode(uint8_t *msg) override;
 };
 
+class Reset: public BaseMessage {
+ public:
+  enum FieldIndex {
+    kProtocolIdentifier = 11,
+    kFlowControlMessageType = 15,
+  };
+  static const uint8_t kResetMsgType = 3;
+  static const uint16_t kResetMsgLength = 16;
+
+  uint8_t msg_type_{0};
+
+  Reset() { msg_type_ = kResetMsgType; }
+  virtual ~Reset() {}
+  void Encode(uint8_t *msg) override;
+};
+
+
 }  // end namespace mds
 
 #endif  // MDS_MDS_TIPC_FCTRL_MSG_H_
diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 24a7e2a..f195b01 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -112,6 +112,7 @@ TipcPortId::TipcPortId(struct tipc_portid id, int sock, 
uint16_t chksize,
     uint64_t sock_buf_size):
   id_(id), bsrsock_(sock), chunk_size_(chksize), rcv_buf_size_(sock_buf_size) {
   state_ = State::kStartup;
+  SendReset();
 }
 
 TipcPortId::~TipcPortId() {
@@ -189,7 +190,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t 
length,
         sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
   } else {
     ++sndwnd_.send_;
-    m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+    m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
         "QueData[mseq:%u, mfrag:%u, fseq:%u, len:%u], "
         "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
         id_.node, id_.ref,
@@ -248,10 +249,22 @@ void TipcPortId::SendNack(uint16_t fseq, uint16_t svc_id) 
{
   Nack nack(svc_id, fseq);
   nack.Encode(data);
   Send(data, Nack::kNackMsgLength);
-  m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+  m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
       "SndNack[fseq:%u]", id_.node, id_.ref, fseq);
 }
 
+void TipcPortId::SendReset() {
+  uint8_t data[Reset::kResetMsgLength] = {0};
+
+  HeaderMessage header(Reset::kResetMsgLength, 0, 0, 0);
+  header.Encode(data);
+
+  Reset reset;
+  reset.Encode(data);
+  Send(data, Reset::kResetMsgLength);
+  m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
+      "SndReset", id_.node, id_.ref);
+}
 
 uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
     uint16_t fseq, uint16_t svc_id) {
@@ -330,8 +343,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
         // send nack
         SendNack((rcvwnd_.rcv_ + Seq16(1)).v(), svc_id);
       }
-    }
-    if (Seq16(fseq) <= rcvwnd_.rcv_) {
+    } else if (Seq16(fseq) <= rcvwnd_.rcv_) {
       rc = NCSCC_RC_FAILURE;
       // unexpected retransmission
       m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
@@ -399,7 +411,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
chksize) {
             sndwnd_.nacked_space_ += msg->header_.msg_len_;
             msg->is_sent_ = true;
             resend_bytes += msg->header_.msg_len_;
-            m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+            m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
                 "SndQData[fseq:%u, len:%u], "
                 "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
                 id_.node, id_.ref,
@@ -443,7 +455,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
   }
   if (state_ == State::kRcvBuffOverflow) {
     m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
-        "RcvNack[fseq:%u, state:%u]"
+        "RcvNack[fseq:%u, state:%u], "
         "Warning[Ignore Nack]",
         id_.node, id_.ref,
         fseq, (uint8_t)state_);
@@ -462,7 +474,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
     if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
       msg->is_sent_ = true;
     }
-    m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+    m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
         "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
         "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
         id_.node, id_.ref,
@@ -495,12 +507,11 @@ bool TipcPortId::ReceiveTmrTxProb(uint8_t max_txprob) {
     // receiver is at old mds version
     if (state_ == State::kDisabled) {
       FlushData();
+      m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u], "
+          "TxProbExp, TxProb[retries:%u, state:%u]",
+          id_.node, id_.ref,
+          txprob_cnt_, (uint8_t)state_);
     }
-
-    m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u], "
-        "TxProbExp, TxProb[retries:%u, state:%u]",
-        id_.node, id_.ref,
-        txprob_cnt_, (uint8_t)state_);
   }
   return restart_txprob;
 }
@@ -518,4 +529,18 @@ void TipcPortId::ReceiveTmrChunkAck() {
   }
 }
 
+void TipcPortId::ReceiveReset() {
+  m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], "
+      "RcvReset, "
+      "TxProb[retries:%u, state:%u]",
+      id_.node, id_.ref,
+      txprob_cnt_, (uint8_t)state_);
+  if (state_ == State::kStartup || state_ == State::kTxProb) {
+    state_ = State::kEnabled;
+  }
+  rcvwnd_.acked_ = 0;
+  rcvwnd_.rcv_ = 0;
+  rcvwnd_.acked_ = 0;
+}
+
 }  // end namespace mds
diff --git a/src/mds/mds_tipc_fctrl_portid.h b/src/mds/mds_tipc_fctrl_portid.h
index d238ac6..ffed2ad 100644
--- a/src/mds/mds_tipc_fctrl_portid.h
+++ b/src/mds/mds_tipc_fctrl_portid.h
@@ -134,11 +134,13 @@ class TipcPortId {
   void ReceiveChunkAck(uint16_t fseq, uint16_t chunk_size);
   void SendChunkAck(uint16_t fseq, uint16_t svc_id, uint16_t chunk_size);
   void SendNack(uint16_t fseq, uint16_t svc_id);
+  void SendReset();
   uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag,
       uint16_t fseq, uint16_t svc_id);
   void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq);
   bool ReceiveTmrTxProb(uint8_t max_txprob);
   void ReceiveTmrChunkAck();
+  void ReceiveReset();
   void FlushData();
   uint32_t Send(uint8_t* data, uint16_t length);
   uint32_t Queue(const uint8_t* data, uint16_t length, bool is_sent);
-- 
2.7.4



_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to