Hi Minh,

I have minor comments below.

Regards, Vu

On 8/14/19 1:38 PM, Minh Chau wrote:
If the ack size is configured greater than 1, there should be a timeout
at receiver ends to send the ack message back to senders.
The ack message timeout utilizes the poll timeout in flow control thread
to make mds lightweight (in contrast to additional timer threads).
---
  src/mds/mds_tipc_fctrl_intf.cc   | 33 ++++++++++++++++++++++++++++++---
  src/mds/mds_tipc_fctrl_msg.h     |  6 ++++++
  src/mds/mds_tipc_fctrl_portid.cc | 15 +++++++++++++++
  src/mds/mds_tipc_fctrl_portid.h  |  1 +
  4 files changed, 52 insertions(+), 3 deletions(-)

diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc
index 91b9107..bd0a8f6 100644
--- a/src/mds/mds_tipc_fctrl_intf.cc
+++ b/src/mds/mds_tipc_fctrl_intf.cc
@@ -66,7 +66,8 @@ std::map<uint64_t, TipcPortId*> portid_map;
  std::mutex portid_map_mutex;
// chunk ack parameters
-// todo: The chunk ack size should be configurable
+// todo: The chunk ack timeout and chunk ack size should be configurable
+int kChunkAckTimeout = 1000;  // in miliseconds
  uint16_t kChunkAckSize = 3;
TipcPortId* portid_lookup(struct tipc_portid id) {
@@ -75,6 +76,15 @@ TipcPortId* portid_lookup(struct tipc_portid id) {
    return portid_map[uid];
  }
+void process_timer_event(const Event evt) {
+  for (auto i : portid_map) {
+    TipcPortId* portid = i.second;
+    if (evt.type_ == Event::Type::kEvtTmrChunkAck) {
+      portid->ReceiveTmrChunkAck();
+    }
+  }
+}
+
  uint32_t process_flow_event(const Event evt) {
    uint32_t rc = NCSCC_RC_SUCCESS;
    TipcPortId *portid = portid_lookup(evt.id_);
@@ -110,7 +120,7 @@ uint32_t process_flow_event(const Event evt) {
  uint32_t process_all_events(void) {
    enum { FD_FCTRL = 0, NUM_FDS };
- int poll_tmo = MDTM_TIPC_POLL_TIMEOUT;
+  int poll_tmo = kChunkAckTimeout;
    while (true) {
      int pollres;
      struct pollfd pfd[NUM_FDS] = {{0}};
@@ -135,11 +145,24 @@ uint32_t process_all_events(void) {
          if (evt == nullptr) continue;
portid_map_mutex.lock();
-        process_flow_event(*evt);
+
+        if (evt->IsTimerEvent()) {
+          process_timer_event(*evt);
+        }
+        if (evt->IsFlowEvent()) {
+          process_flow_event(*evt);
+        }
+
[Vu] Should log something here if the event is none of above?
          delete evt;
          portid_map_mutex.unlock();
        }
      }
+    // timeout, scan all portid and send ack msgs
+    if (pollres == 0) {
+      portid_map_mutex.lock();
+      process_timer_event(Event(Event::Type::kEvtTmrChunkAck));
+      portid_map_mutex.unlock();
+    }
    }  /* while */
    return NCSCC_RC_SUCCESS;
  }
@@ -368,6 +391,10 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t 
len,
        portid_map_mutex.lock();
        uint32_t rc = process_flow_event(Event(Event::Type::kEvtRcvData,
            id, data.svc_id_, header.mseq_, header.mfrag_, header.fseq_));
+      if (rc == NCSCC_RC_CONTINUE) {
+        process_timer_event(Event(Event::Type::kEvtTmrChunkAck));
[Vu] Missed to unlock the mutex here
+        rc = NCSCC_RC_SUCCESS;
+      }
        portid_map_mutex.unlock();
        return rc;
      }
diff --git a/src/mds/mds_tipc_fctrl_msg.h b/src/mds/mds_tipc_fctrl_msg.h
index 677f256..8e6a874 100644
--- a/src/mds/mds_tipc_fctrl_msg.h
+++ b/src/mds/mds_tipc_fctrl_msg.h
@@ -44,6 +44,8 @@ class Event {
                             // selective data msgs (not supported)
      kEvtDropData,          // event reported from tipc that a message is not
                             // delivered
+    kEvtTmrAll,
+    kEvtTmrChunkAck,  // event to send the chunk ack
    };
    NCS_IPC_MSG next_{0};
    Type type_;
@@ -68,6 +70,10 @@ class Event {
      fseq_(f_seg_num), chunk_size_(chunk_size) {
      type_ = type;
    }
+  bool IsTimerEvent() { return (type_ > Type::kEvtTmrAll); }
+  bool IsFlowEvent() {
+    return (Type::kEvtDataFlowAll < type_ && type_ < Type::kEvtTmrAll);
+  }
[Vu] Consider making these ones  to be constant methods if they do not change any of their attribute values.
  };
class BaseMessage {
diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 24d13ee..64115d5 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -67,6 +67,8 @@ TipcPortId::TipcPortId(struct tipc_portid id, int sock, 
uint16_t chksize,
  }
TipcPortId::~TipcPortId() {
+  // Fake a TmrChunkAck event to ack all received messages
+  ReceiveTmrChunkAck();
    // clear all msg in sndqueue_
    sndqueue_.Clear();
  }
@@ -156,6 +158,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
        // send ack for @chunk_size_ msgs starting from fseq
        SendChunkAck(fseq, svc_id, chunk_size_);
        rcvwnd_.acked_ = rcvwnd_.rcv_;
+      rc = NCSCC_RC_CONTINUE;
      }
    } else {
      // todo: update rcvwnd_.nacked_space_.
@@ -258,4 +261,16 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
    }
  }
+void TipcPortId::ReceiveTmrChunkAck() {
+  uint16_t chksize = rcvwnd_.rcv_ - rcvwnd_.acked_;
+  if (chksize > 0) {
+    m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u], "
+        "ChkAckExp",
+        id_.node, id_.ref);
+    // send ack for @chksize msgs starting from rcvwnd_.rcv_
+    SendChunkAck(rcvwnd_.rcv_, 0, chksize);
+    rcvwnd_.acked_ = rcvwnd_.rcv_;
+  }
+}
+
  }  // end namespace mds
diff --git a/src/mds/mds_tipc_fctrl_portid.h b/src/mds/mds_tipc_fctrl_portid.h
index 8068e6e..99beaaf 100644
--- a/src/mds/mds_tipc_fctrl_portid.h
+++ b/src/mds/mds_tipc_fctrl_portid.h
@@ -53,6 +53,7 @@ class TipcPortId {
    uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag,
        uint16_t fseq, uint16_t svc_id);
    void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq);
+  void ReceiveTmrChunkAck();
    uint32_t Send(uint8_t* data, uint16_t length);
    uint32_t Queue(const uint8_t* data, uint16_t length);



_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to