Hi,

The counters reset will be removed in ReceiveIntro().

Thanks

Minh


On 15/10/19 12:50 pm, Minh Chau wrote:
mds relies on data message sent from the peer to determine
whether the MDS_TIPC_FCTRL_ENABLED is set. The data message
may not be sent right after TIPC_PUBLISHED event, which can
cause the tx probation timer timeout.

This patch add Intro message, which is sent right after the
TIPC_PUBLISHED to help mds determine the flow control supported
at the peer earlier.
---
  src/mds/mds_main.c               |  2 +-
  src/mds/mds_tipc_fctrl_intf.cc   | 27 ++++++++++++++++++++++
  src/mds/mds_tipc_fctrl_msg.cc    | 11 +++++++++
  src/mds/mds_tipc_fctrl_msg.h     | 18 +++++++++++++++
  src/mds/mds_tipc_fctrl_portid.cc | 49 ++++++++++++++++++++++++++++++----------
  src/mds/mds_tipc_fctrl_portid.h  |  2 ++
  6 files changed, 96 insertions(+), 13 deletions(-)

diff --git a/src/mds/mds_main.c b/src/mds/mds_main.c
index 8c9b1f1..c7d2f7b 100644
--- a/src/mds/mds_main.c
+++ b/src/mds/mds_main.c
@@ -408,7 +408,7 @@ uint32_t mds_lib_req(NCS_LIB_REQ_INFO *req)
                                if (tipc_mcast_enabled != false)
                                        tipc_mcast_enabled = true;
- m_MDS_LOG_DBG(
+                               m_MDS_LOG_NOTIFY(
                                    "MDS: TIPC_MCAST_ENABLED: %d  Set argument 
\n",
                                    tipc_mcast_enabled);
                        }
diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc
index 6271890..b803bfe 100644
--- a/src/mds/mds_tipc_fctrl_intf.cc
+++ b/src/mds/mds_tipc_fctrl_intf.cc
@@ -39,6 +39,7 @@ using mds::DataMessage;
  using mds::ChunkAck;
  using mds::HeaderMessage;
  using mds::Nack;
+using mds::Intro;
namespace {
  // flow control enabled/disabled
@@ -124,12 +125,20 @@ uint32_t process_flow_event(const Event& evt) {
    uint32_t rc = NCSCC_RC_SUCCESS;
    TipcPortId *portid = portid_lookup(evt.id_);
    if (portid == nullptr) {
+    // the null portid normally should not happen; however because the
+    // tipc_cb.Dsock and tipc_cb.BSRsock are separated; the data message
+    // sent from BSRsock may come before reception of TIPC_PUBLISHED
      if (evt.type_ == Event::Type::kEvtRcvData) {
        portid = new TipcPortId(evt.id_, data_sock_fd,
            kChunkAckSize, sock_buf_size);
        portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid;
        rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
              evt.fseq_, evt.svc_id_);
+    } else if (evt.type_ == Event::Type::kEvtRcvIntro) {
+      portid = new TipcPortId(evt.id_, data_sock_fd,
+          kChunkAckSize, sock_buf_size);
+      portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid;
+      portid->ReceiveIntro();
      } else {
        m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
            "RcvEvt[evt:%d], Error[PortId not found]",
@@ -151,6 +160,9 @@ uint32_t process_flow_event(const Event& evt) {
        portid->ReceiveNack(evt.mseq_, evt.mfrag_,
            evt.fseq_);
      }
+    if (evt.type_ == Event::Type::kEvtRcvIntro) {
+      portid->ReceiveIntro();
+    }
    }
    return rc;
  }
@@ -489,6 +501,16 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t 
len,
            m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, Error[%s]",
                strerror(errno));
          }
+      } else if (header.msg_type_ == Intro::kIntroMsgType) {
+        // no need to decode intro message
+        // the decoding intro message type is done in header decoding
+        // send to the event thread
+        if (m_NCS_IPC_SEND(&mbx_events,
+            new Event(Event::Type::kEvtRcvIntro, id, 0, 0, 0, 0),
+                NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
+          m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, Error[%s]",
+              strerror(errno));
+        }
        } else {
          m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
              "[msg_type:%u], Error[not supported message type]",
@@ -516,6 +538,11 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t 
len,
        portid_map_mutex.unlock();
        return rc;
      }
+  } else {
+    m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
+        "Receive non-flow-control data message, "
+        "header.pro_ver:%u",
+        id.node, id.ref, header.pro_ver_);
    }
    return NCSCC_RC_SUCCESS;
  }
diff --git a/src/mds/mds_tipc_fctrl_msg.cc b/src/mds/mds_tipc_fctrl_msg.cc
index 932120f..180dcb6 100644
--- a/src/mds/mds_tipc_fctrl_msg.cc
+++ b/src/mds/mds_tipc_fctrl_msg.cc
@@ -178,4 +178,15 @@ void Nack::Decode(uint8_t *msg) {
    nacked_fseq_ = ncs_decode_16bit(&ptr);
  }
+
+void Intro::Encode(uint8_t *msg) {
+  uint8_t *ptr;
+  // encode protocol identifier
+  ptr = &msg[Intro::FieldIndex::kProtocolIdentifier];
+  ncs_encode_32bit(&ptr, MDS_PROT_FCTRL_ID);
+  // encode message type
+  ptr = &msg[Intro::FieldIndex::kFlowControlMessageType];
+  ncs_encode_8bit(&ptr, kIntroMsgType);
+}
+
  }  // end namespace mds
diff --git a/src/mds/mds_tipc_fctrl_msg.h b/src/mds/mds_tipc_fctrl_msg.h
index e1db200..3e45fa6 100644
--- a/src/mds/mds_tipc_fctrl_msg.h
+++ b/src/mds/mds_tipc_fctrl_msg.h
@@ -45,6 +45,7 @@ class Event {
      kEvtDropData,          // event reported from tipc that a message is not
                             // delivered
      kEvtRcvNack,           // event that received nack message
+    kEvtRcvIntro,          // event that received intro message
      kEvtTmrAll,
      kEvtTmrTxProb,    // event that tx probation timer expired for once
      kEvtTmrChunkAck,  // event to send the chunk ack
@@ -172,6 +173,23 @@ class Nack: public BaseMessage {
    void Decode(uint8_t *msg) override;
  };
+class Intro: public BaseMessage {
+ public:
+  enum FieldIndex {
+    kProtocolIdentifier = 11,
+    kFlowControlMessageType = 15,
+  };
+  static const uint8_t kIntroMsgType = 3;
+  static const uint16_t kIntroMsgLength = 16;
+
+  uint8_t msg_type_{0};
+
+  Intro() { msg_type_ = kIntroMsgType; }
+  virtual ~Intro() {}
+  void Encode(uint8_t *msg) override;
+};
+
+
  }  // end namespace mds
#endif // MDS_MDS_TIPC_FCTRL_MSG_H_
diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 24a7e2a..5b882c9 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -112,6 +112,7 @@ TipcPortId::TipcPortId(struct tipc_portid id, int sock, 
uint16_t chksize,
      uint64_t sock_buf_size):
    id_(id), bsrsock_(sock), chunk_size_(chksize), rcv_buf_size_(sock_buf_size) 
{
    state_ = State::kStartup;
+  SendIntro();
  }
TipcPortId::~TipcPortId() {
@@ -189,7 +190,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t 
length,
          sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
    } else {
      ++sndwnd_.send_;
-    m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+    m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
          "QueData[mseq:%u, mfrag:%u, fseq:%u, len:%u], "
          "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
          id_.node, id_.ref,
@@ -248,10 +249,22 @@ void TipcPortId::SendNack(uint16_t fseq, uint16_t svc_id) 
{
    Nack nack(svc_id, fseq);
    nack.Encode(data);
    Send(data, Nack::kNackMsgLength);
-  m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+  m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
        "SndNack[fseq:%u]", id_.node, id_.ref, fseq);
  }
+void TipcPortId::SendIntro() {
+  uint8_t data[Intro::kIntroMsgLength] = {0};
+
+  HeaderMessage header(Intro::kIntroMsgLength, 0, 0, 0);
+  header.Encode(data);
+
+  Intro intro;
+  intro.Encode(data);
+  Send(data, Intro::kIntroMsgLength);
+  m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
+      "SndIntro", id_.node, id_.ref);
+}
uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
      uint16_t fseq, uint16_t svc_id) {
@@ -330,8 +343,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
          // send nack
          SendNack((rcvwnd_.rcv_ + Seq16(1)).v(), svc_id);
        }
-    }
-    if (Seq16(fseq) <= rcvwnd_.rcv_) {
+    } else if (Seq16(fseq) <= rcvwnd_.rcv_) {
        rc = NCSCC_RC_FAILURE;
        // unexpected retransmission
        m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
@@ -399,7 +411,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
chksize) {
              sndwnd_.nacked_space_ += msg->header_.msg_len_;
              msg->is_sent_ = true;
              resend_bytes += msg->header_.msg_len_;
-            m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+            m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
                  "SndQData[fseq:%u, len:%u], "
                  "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
                  id_.node, id_.ref,
@@ -443,7 +455,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
    }
    if (state_ == State::kRcvBuffOverflow) {
      m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
-        "RcvNack[fseq:%u, state:%u]"
+        "RcvNack[fseq:%u, state:%u], "
          "Warning[Ignore Nack]",
          id_.node, id_.ref,
          fseq, (uint8_t)state_);
@@ -462,7 +474,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
      if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
        msg->is_sent_ = true;
      }
-    m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+    m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
          "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
          "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
          id_.node, id_.ref,
@@ -495,12 +507,11 @@ bool TipcPortId::ReceiveTmrTxProb(uint8_t max_txprob) {
      // receiver is at old mds version
      if (state_ == State::kDisabled) {
        FlushData();
+      m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u], "
+          "TxProbExp, TxProb[retries:%u, state:%u]",
+          id_.node, id_.ref,
+          txprob_cnt_, (uint8_t)state_);
      }
-
-    m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u], "
-        "TxProbExp, TxProb[retries:%u, state:%u]",
-        id_.node, id_.ref,
-        txprob_cnt_, (uint8_t)state_);
    }
    return restart_txprob;
  }
@@ -518,4 +529,18 @@ void TipcPortId::ReceiveTmrChunkAck() {
    }
  }
+void TipcPortId::ReceiveIntro() {
+  m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], "
+      "RcvIntro, "
+      "TxProb[retries:%u, state:%u]",
+      id_.node, id_.ref,
+      txprob_cnt_, (uint8_t)state_);
+  if (state_ == State::kStartup || state_ == State::kTxProb) {
+    state_ = State::kEnabled;
+  }
+  rcvwnd_.acked_ = 0;
+  rcvwnd_.rcv_ = 0;
+  rcvwnd_.acked_ = 0;
+}
+
  }  // end namespace mds
diff --git a/src/mds/mds_tipc_fctrl_portid.h b/src/mds/mds_tipc_fctrl_portid.h
index d238ac6..bb569f1 100644
--- a/src/mds/mds_tipc_fctrl_portid.h
+++ b/src/mds/mds_tipc_fctrl_portid.h
@@ -134,11 +134,13 @@ class TipcPortId {
    void ReceiveChunkAck(uint16_t fseq, uint16_t chunk_size);
    void SendChunkAck(uint16_t fseq, uint16_t svc_id, uint16_t chunk_size);
    void SendNack(uint16_t fseq, uint16_t svc_id);
+  void SendIntro();
    uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag,
        uint16_t fseq, uint16_t svc_id);
    void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq);
    bool ReceiveTmrTxProb(uint8_t max_txprob);
    void ReceiveTmrChunkAck();
+  void ReceiveIntro();
    void FlushData();
    uint32_t Send(uint8_t* data, uint16_t length);
    uint32_t Queue(const uint8_t* data, uint16_t length, bool is_sent);


_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to