Hi Thuan,

I can rename it as "Intro" message, then the rcvwnd counter shall be removed.

This new message can not replace the tx prob timer. This new message is to speed up the determinatin of flow control at the peer side rather than mds data message. It is needed for the flow control sender 'talk' with the non-flow-control receiver who will not send any ack back.

THanks,

Minh

On 14/10/19 7:06 pm, Tran Thuan wrote:
Hi bro.Minh,

Thanks for explanation.
I think the "reset" message should be rename to "introduce" message.
Another question: with this fix, will tx probation timer become redundant or 
still useful in somehow?

Best Regards,
ThuanTr

-----Original Message-----
From: Minh Hon Chau <minh.c...@dektech.com.au>
Sent: Monday, October 14, 2019 1:01 PM
To: Tran Thuan <thuan.t...@dektech.com.au>; hans.nordeb...@ericsson.com; 
gary....@dektech.com.au; vu.m.ngu...@dektech.com.au
Cc: opensaf-devel@lists.sourceforge.net
Subject: Re: [PATCH 1/1] mds: Add Reset message [#3090]

Hi Thuan,

If the chunkack is configured to send after a few data messages, then the 
sender is not getting any chunkack for the first message from receiver until 
chunkack timeout (which is also configurable to be a bit larger value). Then, 
the probation timer would be timeout at sender.

The rcvwnd.acked_ will be fixed.

Thanks

Minh

On 14/10/19 4:39 pm, Tran Thuan wrote:
Hi bro.Minh,

- In my understanding, tx probation timer only start when sender send
first message.
Then sender relies on chunkAck to know receiver support MDS FCTRL or
timeout as not support.
But from what you describe, sender got tx probation timer timeout
before sending first message?
Or after sending first message but sender cannot get any chunkAck somehow?
I am confused this point. Could you help explain?

- About the code, mistake set '0' twice for .acked_ in
TipcPortId::ReceiveReset()

Best Regards,
ThuanTr

-----Original Message-----
From: Minh Chau <minh.c...@dektech.com.au>
Sent: Friday, October 11, 2019 10:52 AM
To: hans.nordeb...@ericsson.com; gary....@dektech.com.au;
vu.m.ngu...@dektech.com.au; thuan.t...@dektech.com.au
Cc: opensaf-devel@lists.sourceforge.net; Minh Chau
<minh.c...@dektech.com.au>
Subject: [PATCH 1/1] mds: Add Reset message [#3090]

mds relies on data message sent from the peer to determine whether the
MDS_TIPC_FCTRL_ENABLED is set. The data message may not be sent right
after TIPC_PUBLISHED event, which can cause the tx probation timer timeout.

This patch add Reset message, which is sent right after the
TIPC_PUBLISHED to help mds determine the flow control supported at the peer 
earlier.
---
   src/mds/mds_main.c               |  2 +-
   src/mds/mds_tipc_fctrl_intf.cc   | 27 ++++++++++++++++++++++
   src/mds/mds_tipc_fctrl_msg.cc    | 11 +++++++++
   src/mds/mds_tipc_fctrl_msg.h     | 18 +++++++++++++++
   src/mds/mds_tipc_fctrl_portid.cc | 49
++++++++++++++++++++++++++++++----------
   src/mds/mds_tipc_fctrl_portid.h  |  2 ++
   6 files changed, 96 insertions(+), 13 deletions(-)

diff --git a/src/mds/mds_main.c b/src/mds/mds_main.c index
8c9b1f1..c7d2f7b
100644
--- a/src/mds/mds_main.c
+++ b/src/mds/mds_main.c
@@ -408,7 +408,7 @@ uint32_t mds_lib_req(NCS_LIB_REQ_INFO *req)
                                if (tipc_mcast_enabled != false)
                                        tipc_mcast_enabled = true;
- m_MDS_LOG_DBG(
+                               m_MDS_LOG_NOTIFY(
                                    "MDS: TIPC_MCAST_ENABLED: %d  Set argument 
\n",
                                    tipc_mcast_enabled);
                        }
diff --git a/src/mds/mds_tipc_fctrl_intf.cc
b/src/mds/mds_tipc_fctrl_intf.cc index 6271890..e8c9121 100644
--- a/src/mds/mds_tipc_fctrl_intf.cc
+++ b/src/mds/mds_tipc_fctrl_intf.cc
@@ -39,6 +39,7 @@ using mds::DataMessage;  using mds::ChunkAck;  using
mds::HeaderMessage;  using mds::Nack;
+using mds::Reset;
namespace {
   // flow control enabled/disabled
@@ -124,12 +125,20 @@ uint32_t process_flow_event(const Event& evt) {
     uint32_t rc = NCSCC_RC_SUCCESS;
     TipcPortId *portid = portid_lookup(evt.id_);
     if (portid == nullptr) {
+    // the null portid normally should not happen; however because the
+    // tipc_cb.Dsock and tipc_cb.BSRsock are separated; the data message
+    // sent from BSRsock may come before reception of TIPC_PUBLISHED
       if (evt.type_ == Event::Type::kEvtRcvData) {
         portid = new TipcPortId(evt.id_, data_sock_fd,
             kChunkAckSize, sock_buf_size);
         portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid;
         rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
               evt.fseq_, evt.svc_id_);
+    } else if (evt.type_ == Event::Type::kEvtRcvReset) {
+      portid = new TipcPortId(evt.id_, data_sock_fd,
+          kChunkAckSize, sock_buf_size);
+      portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid;
+      portid->ReceiveReset();
       } else {
         m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
             "RcvEvt[evt:%d], Error[PortId not found]", @@ -151,6
+160,9 @@ uint32_t process_flow_event(const Event& evt) {
         portid->ReceiveNack(evt.mseq_, evt.mfrag_,
             evt.fseq_);
       }
+    if (evt.type_ == Event::Type::kEvtRcvReset) {
+      portid->ReceiveReset();
+    }
     }
     return rc;
   }
@@ -489,6 +501,16 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer,
uint16_t len,
             m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events,
Error[%s]",
                 strerror(errno));
           }
+      } else if (header.msg_type_ == Reset::kResetMsgType) {
+        // no need to decode reset message
+        // the decoding reset message type is done in header decoding
+        // send to the event thread
+        if (m_NCS_IPC_SEND(&mbx_events,
+            new Event(Event::Type::kEvtRcvReset, id, 0, 0, 0, 0),
+                NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
+          m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events,
Error[%s]",
+              strerror(errno));
+        }
         } else {
           m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
               "[msg_type:%u], Error[not supported message type]", @@
-516,6
+538,11 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t
+len,
         portid_map_mutex.unlock();
         return rc;
       }
+  } else {
+    m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
+        "Receive non-flow-control data message, "
+        "header.pro_ver:%u",
+        id.node, id.ref, header.pro_ver_);
     }
     return NCSCC_RC_SUCCESS;
   }
diff --git a/src/mds/mds_tipc_fctrl_msg.cc
b/src/mds/mds_tipc_fctrl_msg.cc index 932120f..4aba3fb 100644
--- a/src/mds/mds_tipc_fctrl_msg.cc
+++ b/src/mds/mds_tipc_fctrl_msg.cc
@@ -178,4 +178,15 @@ void Nack::Decode(uint8_t *msg) {
     nacked_fseq_ = ncs_decode_16bit(&ptr);  }
+
+void Reset::Encode(uint8_t *msg) {
+  uint8_t *ptr;
+  // encode protocol identifier
+  ptr = &msg[Reset::FieldIndex::kProtocolIdentifier];
+  ncs_encode_32bit(&ptr, MDS_PROT_FCTRL_ID);
+  // encode message type
+  ptr = &msg[Reset::FieldIndex::kFlowControlMessageType];
+  ncs_encode_8bit(&ptr, kResetMsgType); }
+
   }  // end namespace mds
diff --git a/src/mds/mds_tipc_fctrl_msg.h
b/src/mds/mds_tipc_fctrl_msg.h index e1db200..e94fb9d 100644
--- a/src/mds/mds_tipc_fctrl_msg.h
+++ b/src/mds/mds_tipc_fctrl_msg.h
@@ -45,6 +45,7 @@ class Event {
       kEvtDropData,          // event reported from tipc that a message is
not
                              // delivered
       kEvtRcvNack,           // event that received nack message
+    kEvtRcvReset,          // event that received reset message
       kEvtTmrAll,
       kEvtTmrTxProb,    // event that tx probation timer expired for once
       kEvtTmrChunkAck,  // event to send the chunk ack @@ -172,6
+173,23 @@ class Nack: public BaseMessage {
     void Decode(uint8_t *msg) override;
   };
+class Reset: public BaseMessage {
+ public:
+  enum FieldIndex {
+    kProtocolIdentifier = 11,
+    kFlowControlMessageType = 15,
+  };
+  static const uint8_t kResetMsgType = 3;
+  static const uint16_t kResetMsgLength = 16;
+
+  uint8_t msg_type_{0};
+
+  Reset() { msg_type_ = kResetMsgType; }
+  virtual ~Reset() {}
+  void Encode(uint8_t *msg) override; };
+
+
   }  // end namespace mds
#endif // MDS_MDS_TIPC_FCTRL_MSG_H_ diff --git
a/src/mds/mds_tipc_fctrl_portid.cc
b/src/mds/mds_tipc_fctrl_portid.cc
index 24a7e2a..f195b01 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -112,6 +112,7 @@ TipcPortId::TipcPortId(struct tipc_portid id, int
sock, uint16_t chksize,
       uint64_t sock_buf_size):
     id_(id), bsrsock_(sock), chunk_size_(chksize),
rcv_buf_size_(sock_buf_size) {
     state_ = State::kStartup;
+  SendReset();
   }
TipcPortId::~TipcPortId() {
@@ -189,7 +190,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data,
uint16_t length,
           sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
     } else {
       ++sndwnd_.send_;
-    m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+    m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
           "QueData[mseq:%u, mfrag:%u, fseq:%u, len:%u], "
           "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
           id_.node, id_.ref,
@@ -248,10 +249,22 @@ void TipcPortId::SendNack(uint16_t fseq,
uint16_t
svc_id) {
     Nack nack(svc_id, fseq);
     nack.Encode(data);
     Send(data, Nack::kNackMsgLength);
-  m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+  m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
         "SndNack[fseq:%u]", id_.node, id_.ref, fseq);  }
+void TipcPortId::SendReset() {
+  uint8_t data[Reset::kResetMsgLength] = {0};
+
+  HeaderMessage header(Reset::kResetMsgLength, 0, 0, 0);
+ header.Encode(data);
+
+  Reset reset;
+  reset.Encode(data);
+  Send(data, Reset::kResetMsgLength);
+  m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
+      "SndReset", id_.node, id_.ref); }
uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
       uint16_t fseq, uint16_t svc_id) { @@ -330,8 +343,7 @@ uint32_t
TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
           // send nack
           SendNack((rcvwnd_.rcv_ + Seq16(1)).v(), svc_id);
         }
-    }
-    if (Seq16(fseq) <= rcvwnd_.rcv_) {
+    } else if (Seq16(fseq) <= rcvwnd_.rcv_) {
         rc = NCSCC_RC_FAILURE;
         // unexpected retransmission
         m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
@@ -399,7 +411,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq,
uint16_t
chksize) {
               sndwnd_.nacked_space_ += msg->header_.msg_len_;
               msg->is_sent_ = true;
               resend_bytes += msg->header_.msg_len_;
-            m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+            m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
                   "SndQData[fseq:%u, len:%u], "
                   "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
                   id_.node, id_.ref,
@@ -443,7 +455,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq,
uint16_t mfrag,
     }
     if (state_ == State::kRcvBuffOverflow) {
       m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
-        "RcvNack[fseq:%u, state:%u]"
+        "RcvNack[fseq:%u, state:%u], "
           "Warning[Ignore Nack]",
           id_.node, id_.ref,
           fseq, (uint8_t)state_);
@@ -462,7 +474,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq,
uint16_t mfrag,
       if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
         msg->is_sent_ = true;
       }
-    m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+    m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
           "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
           "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
           id_.node, id_.ref,
@@ -495,12 +507,11 @@ bool TipcPortId::ReceiveTmrTxProb(uint8_t
max_txprob) {
       // receiver is at old mds version
       if (state_ == State::kDisabled) {
         FlushData();
+      m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u], "
+          "TxProbExp, TxProb[retries:%u, state:%u]",
+          id_.node, id_.ref,
+          txprob_cnt_, (uint8_t)state_);
       }
-
-    m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u], "
-        "TxProbExp, TxProb[retries:%u, state:%u]",
-        id_.node, id_.ref,
-        txprob_cnt_, (uint8_t)state_);
     }
     return restart_txprob;
   }
@@ -518,4 +529,18 @@ void TipcPortId::ReceiveTmrChunkAck() {
     }
   }
+void TipcPortId::ReceiveReset() {
+  m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], "
+      "RcvReset, "
+      "TxProb[retries:%u, state:%u]",
+      id_.node, id_.ref,
+      txprob_cnt_, (uint8_t)state_);
+  if (state_ == State::kStartup || state_ == State::kTxProb) {
+    state_ = State::kEnabled;
+  }
+  rcvwnd_.acked_ = 0;
+  rcvwnd_.rcv_ = 0;
+  rcvwnd_.acked_ = 0;
+}
+
   }  // end namespace mds
diff --git a/src/mds/mds_tipc_fctrl_portid.h
b/src/mds/mds_tipc_fctrl_portid.h index d238ac6..ffed2ad 100644
--- a/src/mds/mds_tipc_fctrl_portid.h
+++ b/src/mds/mds_tipc_fctrl_portid.h
@@ -134,11 +134,13 @@ class TipcPortId {
     void ReceiveChunkAck(uint16_t fseq, uint16_t chunk_size);
     void SendChunkAck(uint16_t fseq, uint16_t svc_id, uint16_t chunk_size);
     void SendNack(uint16_t fseq, uint16_t svc_id);
+  void SendReset();
     uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag,
         uint16_t fseq, uint16_t svc_id);
     void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq);
     bool ReceiveTmrTxProb(uint8_t max_txprob);
     void ReceiveTmrChunkAck();
+  void ReceiveReset();
     void FlushData();
     uint32_t Send(uint8_t* data, uint16_t length);
     uint32_t Queue(const uint8_t* data, uint16_t length, bool
is_sent);
--
2.7.4






_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to