Hi Vu,

See comments below.

Thanks

Minh

On 1/10/19 8:34 pm, Nguyen Minh Vu wrote:
Hi Minh,

Ack with minor comments. Thanks.

Regards, Vu

On 10/1/19 12:49 PM, Minh Chau wrote:
In the scenario of recovery from split-brain, where both
active director services may suffer mds message loss due
to lost-contact tipc link. If MDS_TIPC_FCTRL_ENABLED is
set, the out-of-order message will be dropped, and there
is no mechanism to trigger the retransmission from receiver
side at this moment (the retransmission is only triggered
from sender as result of TIPC_ERR_OVERLOAD).

In reception of disordered message, the receiver can send
not-acknowledgement to notify the sender for retransmission.
Therefore, the sender can trigger retransmisison in the same
way as receiving TIPC_ERR_OVERLOAD.

This patch adds Nack message for retransmission of disordered
message detected from receiver side.
---
  src/mds/mds_c_api.c              |  2 +-
  src/mds/mds_dt_common.c          |  2 +-
  src/mds/mds_tipc_fctrl_intf.cc   | 19 ++++++++++++++++++-
  src/mds/mds_tipc_fctrl_msg.cc    | 33 +++++++++++++++++++++++++++++++++
  src/mds/mds_tipc_fctrl_msg.h     | 22 ++++++++++++++++++++++
  src/mds/mds_tipc_fctrl_portid.cc | 18 +++++++++++++++++-
  src/mds/mds_tipc_fctrl_portid.h  |  1 +
  7 files changed, 93 insertions(+), 4 deletions(-)

diff --git a/src/mds/mds_c_api.c b/src/mds/mds_c_api.c
index c41c8dd..132555b 100644
--- a/src/mds/mds_c_api.c
+++ b/src/mds/mds_c_api.c
@@ -4196,7 +4196,7 @@ void mds_mcm_msg_loss(MDS_SVC_HDL local_svc_hdl, MDS_DEST rem_adest,
        /* Check whether the msg loss is enabled or not */
      if (true != local_svc_info->i_msg_loss_indication) {
-        m_MDS_LOG_INFO(" MSG loss not enbaled mds_mcm_msg_loss\n");
+        m_MDS_LOG_NOTIFY("MSG loss is not enabled mds_mcm_msg_loss\n");
          return;
      }
  diff --git a/src/mds/mds_dt_common.c b/src/mds/mds_dt_common.c
index 66652af..de13883 100644
--- a/src/mds/mds_dt_common.c
+++ b/src/mds/mds_dt_common.c
@@ -972,7 +972,7 @@ uint32_t mds_tmr_mailbox_processing(void)
                      .vdest_id);
                  break;
              case MDS_REASSEMBLY_TMR:
-                m_MDS_LOG_DBG(
+                m_MDS_LOG_ERR(
                      "MDTM: Tmr Mailbox Processing:Reassemble Tmr Hdl=0x%08x",
                      mbx_evt_info->info.tmr_info_hdl);
                  mdtm_process_reassem_timer_event(
diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc
index 2366672..65f1849 100644
--- a/src/mds/mds_tipc_fctrl_intf.cc
+++ b/src/mds/mds_tipc_fctrl_intf.cc
@@ -38,6 +38,7 @@ using mds::Timer;
  using mds::DataMessage;
  using mds::ChunkAck;
  using mds::HeaderMessage;
+using mds::Nack;
    namespace {
  // flow control enabled/disabled
@@ -142,7 +143,8 @@ uint32_t process_flow_event(const Event& evt) {
      if (evt.type_ == Event::Type::kEvtSendChunkAck) {
        portid->SendChunkAck(evt.fseq_, evt.svc_id_, evt.chunk_size_);
      }
-    if (evt.type_ == Event::Type::kEvtDropData) {
+    if (evt.type_ == Event::Type::kEvtDropData ||
+        evt.type_ == Event::Type::kEvtRcvNack) {
        portid->ReceiveNack(evt.mseq_, evt.mfrag_,
            evt.fseq_);
      }
@@ -464,6 +466,21 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len,
          // skip this data msg
          return NCSCC_RC_FAILURE;
        }
+      if (header.msg_type_ == Nack::kNackMsgType) {
+        // receive nack message
+        Nack nack;
+        nack.Decode(buffer);
+        // send to the event thread
+        if (m_NCS_IPC_SEND(&mbx_events,
+            new Event(Event::Type::kEvtRcvNack, id, nack.svc_id_,
+                header.mseq_, header.mfrag_, nack.nacked_fseq_),
+                NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
+          m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n");
+        }
+        // return NCSCC_RC_FAILURE, so the tipc receiving thread (legacy) will
+        // skip this data msg
+        return NCSCC_RC_FAILURE;
+      }
      } else {
        // receive data message
        DataMessage data;
diff --git a/src/mds/mds_tipc_fctrl_msg.cc b/src/mds/mds_tipc_fctrl_msg.cc
index 064d977..f85568c 100644
--- a/src/mds/mds_tipc_fctrl_msg.cc
+++ b/src/mds/mds_tipc_fctrl_msg.cc
@@ -139,4 +139,37 @@ void ChunkAck::Decode(uint8_t *msg) {
    chunk_size_ = ncs_decode_16bit(&ptr);
  }
  +
+Nack::Nack(uint16_t svc_id, uint16_t fseq):
+    svc_id_(svc_id), nacked_fseq_(fseq) {
+  msg_type_ = kNackMsgType;
+}
+
+void Nack::Encode(uint8_t *msg) {
+  uint8_t *ptr;
+  // encode protocol identifier
+  ptr = &msg[Nack::FieldIndex::kProtocolIdentifier];
+  ncs_encode_32bit(&ptr, MDS_PROT_FCTRL_ID);
+  // encode message type
+  ptr = &msg[Nack::FieldIndex::kFlowControlMessageType];
+  ncs_encode_8bit(&ptr, kNackMsgType);
+  // encode service id
+  ptr = &msg[Nack::FieldIndex::kServiceId];
+  ncs_encode_16bit(&ptr, svc_id_);
+  // encode flow control sequence number
+  ptr = &msg[Nack::FieldIndex::kFlowControlSequenceNumber];
+  ncs_encode_16bit(&ptr, nacked_fseq_);
+}
+
+void Nack::Decode(uint8_t *msg) {
+  uint8_t *ptr;
+
+  // decode service id
+  ptr = &msg[Nack::FieldIndex::kServiceId];
+  svc_id_ = ncs_decode_16bit(&ptr);
+  // decode flow control sequence number
+  ptr = &msg[Nack::FieldIndex::kFlowControlSequenceNumber];
+  nacked_fseq_ = ncs_decode_16bit(&ptr);
+}
+
  }  // end namespace mds
diff --git a/src/mds/mds_tipc_fctrl_msg.h b/src/mds/mds_tipc_fctrl_msg.h
index d67ed19..e1db200 100644
--- a/src/mds/mds_tipc_fctrl_msg.h
+++ b/src/mds/mds_tipc_fctrl_msg.h
@@ -44,6 +44,7 @@ class Event {
                             // selective data msgs (not supported)
      kEvtDropData,          // event reported from tipc that a message is not
                             // delivered
+    kEvtRcvNack,           // event that received nack message
      kEvtTmrAll,
      kEvtTmrTxProb,    // event that tx probation timer expired for once
      kEvtTmrChunkAck,  // event to send the chunk ack
@@ -150,6 +151,27 @@ class ChunkAck: public BaseMessage {
    void Decode(uint8_t *msg) override;
  };
  +class Nack: public BaseMessage {
+ public:
+  enum FieldIndex {
+    kProtocolIdentifier = 11,
+    kFlowControlMessageType = 15,
+    kServiceId = 16,
+    kFlowControlSequenceNumber = 18,
+  };
+  static const uint8_t kNackMsgType = 2;
+  static const uint16_t kNackMsgLength = 20;
+
+  uint8_t msg_type_{0};
+  uint16_t svc_id_{0};
+  uint16_t nacked_fseq_{0};
+  Nack() {}
+  Nack(uint16_t svc_id, uint16_t fseq);
+  virtual ~Nack() {}
+  void Encode(uint8_t *msg) override;
+  void Decode(uint8_t *msg) override;
+};
+
  }  // end namespace mds
    #endif  // MDS_MDS_TIPC_FCTRL_MSG_H_
diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 1ce792d..0e7c77f 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -238,6 +238,20 @@ void TipcPortId::SendChunkAck(uint16_t fseq, uint16_t svc_id,
        fseq, chksize);
  }
  +void TipcPortId::SendNack(uint16_t fseq, uint16_t svc_id) {
+  uint8_t data[Nack::kNackMsgLength];
[Vu] Does it need to memset the data before using?
[M] It does not need to, since the encoding will fill up exactly the kNackMsgLength of bytes, but I can set the @data to all 0 before encoding,  '= {0}' would do
+
+  HeaderMessage header(Nack::kNackMsgLength, 0, 0, fseq);
+  header.Encode(reinterpret_cast<uint8_t*>(&data));
[Vu] Can we pass the data directly without casting? e.g: header.Encode(data);
[M]: I think the cast was there for casting the header/message from old prototyped code, I will remove it for Nack and Ack message.
+
+  Nack nack(svc_id, fseq);
+  nack.Encode(reinterpret_cast<uint8_t*>(&data));
+  Send(reinterpret_cast<uint8_t*>(&data), Nack::kNackMsgLength);
+  m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+      "SndNack[fseq:%u]", id_.node, id_.ref, fseq);
+}
+
+
  uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
      uint16_t fseq, uint16_t svc_id) {
    uint32_t rc = NCSCC_RC_SUCCESS;
@@ -312,9 +326,11 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
              id_.node, id_.ref,
              mseq, mfrag, fseq,
              rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
+        // send nack
+        SendNack((rcvwnd_.rcv_ + Seq16(1)).v(), svc_id);
        }
      }
-    if (Seq16(fseq) <= rcvwnd_.acked_) {
+    if (Seq16(fseq) <= rcvwnd_.rcv_) {
        rc = NCSCC_RC_FAILURE;
        // unexpected retransmission
        m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
diff --git a/src/mds/mds_tipc_fctrl_portid.h b/src/mds/mds_tipc_fctrl_portid.h
index cf2daaa..89a71d3 100644
--- a/src/mds/mds_tipc_fctrl_portid.h
+++ b/src/mds/mds_tipc_fctrl_portid.h
@@ -133,6 +133,7 @@ class TipcPortId {
    bool ReceiveCapable(uint16_t sending_len);
    void ReceiveChunkAck(uint16_t fseq, uint16_t chunk_size);
    void SendChunkAck(uint16_t fseq, uint16_t svc_id, uint16_t chunk_size);
+  void SendNack(uint16_t fseq, uint16_t svc_id);
    uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag,
        uint16_t fseq, uint16_t svc_id);
    void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq);




_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to