From: Gerd Rausch <[email protected]>

Instead of just blocking the sender until "c_npaths" is known
(it gets updated upon the receipt of a MPRDS PONG message),
simply use the first lane (cp_index#0).

But just using the first lane isn't enough.

As soon as we enqueue messages on a different lane, we'd run the risk
of out-of-order delivery of RDS messages.

Earlier messages enqueued on "cp_index == 0" could be delivered later
than more recent messages enqueued on "cp_index > 0", mostly because of
possible head of line blocking issues causing the first lane to be
slower.

To avoid that, we simply take a snapshot of "cp_next_tx_seq" at the
time we're about to fan-out to more lanes.

Then we delay the transmission of messages enqueued on other lanes
with "cp_index > 0" until cp_index#0 caught up with the delivery of
new messages (from "cp_send_queue") as well as in-flight
messages (from "cp_retrans") that haven't been acknowledged yet
by the receiver.

We also add a new counter "mprds_catchup_tx0_retries" to keep track
of how many times "rds_send_xmit" had to suspend activities,
because it was waiting for the first lane to catch up.

Signed-off-by: Gerd Rausch <[email protected]>
Signed-off-by: Allison Henderson <[email protected]>
---
 net/rds/rds.h   |  3 ++
 net/rds/recv.c  | 23 +++++++++++--
 net/rds/send.c  | 91 ++++++++++++++++++++++++++++++-------------------
 net/rds/stats.c |  1 +
 4 files changed, 79 insertions(+), 39 deletions(-)

diff --git a/net/rds/rds.h b/net/rds/rds.h
index d942057b91ee4..f78477912e7c1 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -170,6 +170,8 @@ struct rds_connection {
 
        u32                     c_my_gen_num;
        u32                     c_peer_gen_num;
+
+       u64                     c_cp0_mprds_catchup_tx_seq;
 };
 
 static inline
@@ -748,6 +750,7 @@ struct rds_statistics {
        uint64_t        s_recv_bytes_added_to_socket;
        uint64_t        s_recv_bytes_removed_from_socket;
        uint64_t        s_send_stuck_rm;
+       uint64_t        s_mprds_catchup_tx0_retries;
 };
 
 /* af_rds.c */
diff --git a/net/rds/recv.c b/net/rds/recv.c
index ddf128a023470..889a5b7935e5c 100644
--- a/net/rds/recv.c
+++ b/net/rds/recv.c
@@ -208,6 +208,9 @@ static void rds_recv_hs_exthdrs(struct rds_header *hdr,
        } buffer;
        bool new_with_sport_idx = false;
        u32 new_peer_gen_num = 0;
+       int new_npaths;
+
+       new_npaths = conn->c_npaths;
 
        while (1) {
                len = sizeof(buffer);
@@ -217,8 +220,8 @@ static void rds_recv_hs_exthdrs(struct rds_header *hdr,
                /* Process extension header here */
                switch (type) {
                case RDS_EXTHDR_NPATHS:
-                       conn->c_npaths = min_t(int, RDS_MPATH_WORKERS,
-                                              be16_to_cpu(buffer.rds_npaths));
+                       new_npaths = min_t(int, RDS_MPATH_WORKERS,
+                                          be16_to_cpu(buffer.rds_npaths));
                        break;
                case RDS_EXTHDR_GEN_NUM:
                        new_peer_gen_num = be32_to_cpu(buffer.rds_gen_num);
@@ -233,8 +236,22 @@ static void rds_recv_hs_exthdrs(struct rds_header *hdr,
        }
 
        conn->c_with_sport_idx = new_with_sport_idx;
+
+       if (new_npaths > 1 && new_npaths != conn->c_npaths) {
+               /* We're about to fan-out.
+                * Make sure that messages from cp_index#0
+                * are sent prior to handling other lanes.
+                */
+               struct rds_conn_path *cp0 = conn->c_path;
+               unsigned long flags;
+
+               spin_lock_irqsave(&cp0->cp_lock, flags);
+               conn->c_cp0_mprds_catchup_tx_seq = cp0->cp_next_tx_seq;
+               spin_unlock_irqrestore(&cp0->cp_lock, flags);
+       }
        /* if RDS_EXTHDR_NPATHS was not found, default to a single-path */
-       conn->c_npaths = max_t(int, conn->c_npaths, 1);
+       conn->c_npaths = max_t(int, new_npaths, 1);
+
        conn->c_ping_triggered = 0;
        rds_conn_peer_gen_update(conn, new_peer_gen_num);
 
diff --git a/net/rds/send.c b/net/rds/send.c
index 85e1c5352ad80..ea3b57e9191b1 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -137,6 +137,8 @@ int rds_send_xmit(struct rds_conn_path *cp)
 {
        struct rds_connection *conn = cp->cp_conn;
        struct rds_message *rm;
+       struct rds_conn_path *cp0 = conn->c_path;
+       struct rds_message *rm0;
        unsigned long flags;
        unsigned int tmp;
        struct scatterlist *sg;
@@ -248,6 +250,52 @@ int rds_send_xmit(struct rds_conn_path *cp)
                        if (batch_count >= send_batch_count)
                                goto over_batch;
 
+                       if (cp->cp_index > 0) {
+                               /* make sure cp_index#0 caught up during fan-out
+                                * in order to avoid lane races
+                                */
+
+                               spin_lock_irqsave(&cp0->cp_lock, flags);
+
+                               /* the oldest / first message in the retransmit 
queue
+                                * has to be at or beyond 
c_cp0_mprds_catchup_tx_seq
+                                */
+                               if (!list_empty(&cp0->cp_retrans)) {
+                                       rm0 = list_entry(cp0->cp_retrans.next,
+                                                        struct rds_message,
+                                                        m_conn_item);
+                                       if 
(be64_to_cpu(rm0->m_inc.i_hdr.h_sequence) <
+                                           conn->c_cp0_mprds_catchup_tx_seq) {
+                                               /* the retransmit queue of 
cp_index#0 has not
+                                                * quite caught up yet
+                                                */
+                                               
spin_unlock_irqrestore(&cp0->cp_lock, flags);
+                                               
rds_stats_inc(s_mprds_catchup_tx0_retries);
+                                               goto over_batch;
+                                       }
+                               }
+
+                               /* the oldest / first message of the send queue
+                                * has to be at or beyond 
c_cp0_mprds_catchup_tx_seq
+                                */
+                               rm0 = cp0->cp_xmit_rm;
+                               if (!rm0 && !list_empty(&cp0->cp_send_queue))
+                                       rm0 = 
list_entry(cp0->cp_send_queue.next,
+                                                        struct rds_message,
+                                                        m_conn_item);
+                               if (rm0 && 
be64_to_cpu(rm0->m_inc.i_hdr.h_sequence) <
+                                   conn->c_cp0_mprds_catchup_tx_seq) {
+                                       /* the send queue of cp_index#0 has not 
quite
+                                        * caught up yet
+                                        */
+                                       spin_unlock_irqrestore(&cp0->cp_lock, 
flags);
+                                       
rds_stats_inc(s_mprds_catchup_tx0_retries);
+                                       goto over_batch;
+                               }
+
+                               spin_unlock_irqrestore(&cp0->cp_lock, flags);
+                       }
+
                        spin_lock_irqsave(&cp->cp_lock, flags);
 
                        if (!list_empty(&cp->cp_send_queue)) {
@@ -1042,39 +1090,6 @@ static int rds_cmsg_send(struct rds_sock *rs, struct 
rds_message *rm,
        return ret;
 }
 
-static int rds_send_mprds_hash(struct rds_sock *rs,
-                              struct rds_connection *conn, int nonblock)
-{
-       int hash;
-
-       if (conn->c_npaths == 0)
-               hash = RDS_MPATH_HASH(rs, RDS_MPATH_WORKERS);
-       else
-               hash = RDS_MPATH_HASH(rs, conn->c_npaths);
-       if (conn->c_npaths == 0 && hash != 0) {
-               rds_send_ping(conn, 0);
-
-               /* The underlying connection is not up yet.  Need to wait
-                * until it is up to be sure that the non-zero c_path can be
-                * used.  But if we are interrupted, we have to use the zero
-                * c_path in case the connection ends up being non-MP capable.
-                */
-               if (conn->c_npaths == 0) {
-                       /* Cannot wait for the connection be made, so just use
-                        * the base c_path.
-                        */
-                       if (nonblock)
-                               return 0;
-                       if (wait_event_interruptible(conn->c_hs_waitq,
-                                                    conn->c_npaths != 0))
-                               hash = 0;
-               }
-               if (conn->c_npaths == 1)
-                       hash = 0;
-       }
-       return hash;
-}
-
 static int rds_rdma_bytes(struct msghdr *msg, size_t *rdma_bytes)
 {
        struct rds_rdma_args *args;
@@ -1304,10 +1319,14 @@ int rds_sendmsg(struct socket *sock, struct msghdr 
*msg, size_t payload_len)
                rs->rs_conn = conn;
        }
 
-       if (conn->c_trans->t_mp_capable)
-               cpath = &conn->c_path[rds_send_mprds_hash(rs, conn, nonblock)];
-       else
+       if (conn->c_trans->t_mp_capable) {
+               /* Use c_path[0] until we learn that
+                * the peer supports more (c_npaths > 1)
+                */
+               cpath = &conn->c_path[RDS_MPATH_HASH(rs, conn->c_npaths ? : 1)];
+       } else {
                cpath = &conn->c_path[0];
+       }
 
        rm->m_conn_path = cpath;
 
diff --git a/net/rds/stats.c b/net/rds/stats.c
index cb2e3d2cdf738..24ee22d09e8cf 100644
--- a/net/rds/stats.c
+++ b/net/rds/stats.c
@@ -79,6 +79,7 @@ static const char *const rds_stat_names[] = {
        "recv_bytes_added_to_sock",
        "recv_bytes_freed_fromsock",
        "send_stuck_rm",
+       "mprds_catchup_tx0_retries",
 };
 
 void rds_stats_info_copy(struct rds_info_iterator *iter,
-- 
2.43.0


Reply via email to