Currently ipa_msg_recv() fails, when messages are received partially.

This patch provides a new function ipa_msg_recv_buffered() that uses
an additional ** to a message buffer to store partial data.  When
this happens, -EAGAIN is returned. If NULL is used, the function
behaves similar to ipa_msg_recv() and fails on partial read.
In addition in case of errors the return value is now always -EXXX
and the contents of errno is undefined.

Note that this feature needs support by the calling code insofar that
*tmp_msg must be set to NULL initially and it must be freed and
set to NULL manually when the socket is closed.

Note also that ipa_msg_recv() is then a wrapper around
ipa_msg_recv_buffered() which mimics the old error behaviour by
setting errno explicitely to -rc and returning -1 when an error has
happened.

Ticket: OW#728
Sponsored-by: On-Waves ehf
---
 TODO-RELEASE                    |    1 +
 include/osmocom/abis/e1_input.h |    2 +
 include/osmocom/abis/ipa.h      |    3 +
 src/input/ipa.c                 |  151 ++++++++++++++++++++++++++++++---------
 src/input/ipaccess.c            |   13 +++-
 tests/ipa_recv/ipa_recv_test.c  |   55 +++++++++-----
 tests/ipa_recv/ipa_recv_test.ok |   27 ++++++-
 7 files changed, 198 insertions(+), 54 deletions(-)

diff --git a/TODO-RELEASE b/TODO-RELEASE
index 43b1e8e..71931db 100644
--- a/TODO-RELEASE
+++ b/TODO-RELEASE
@@ -1 +1,2 @@
 #library       what            description / commit summary line
+libosmoabis    abi-change      ipa: Change ipa_msg_recv() to support partial 
receive
diff --git a/include/osmocom/abis/e1_input.h b/include/osmocom/abis/e1_input.h
index 9b77893..cf8677b 100644
--- a/include/osmocom/abis/e1_input.h
+++ b/include/osmocom/abis/e1_input.h
@@ -109,6 +109,8 @@ struct e1inp_ts {
                        struct osmo_fd fd;
                } rs232;
        } driver;
+
+       struct msgb *pending_msg;
 };
 
 struct gsm_e1_subslot {
diff --git a/include/osmocom/abis/ipa.h b/include/osmocom/abis/ipa.h
index d577d74..982b694 100644
--- a/include/osmocom/abis/ipa.h
+++ b/include/osmocom/abis/ipa.h
@@ -27,6 +27,7 @@ struct ipa_server_conn {
        int (*closed_cb)(struct ipa_server_conn *peer);
        int (*cb)(struct ipa_server_conn *peer, struct msgb *msg);
        void                            *data;
+       struct msgb                     *pending_msg;
 };
 
 struct ipa_server_conn *ipa_server_conn_create(void *ctx, struct 
ipa_server_link *link, int fd, int (*cb)(struct ipa_server_conn *peer, struct 
msgb *msg), int (*closed_cb)(struct ipa_server_conn *peer), void *data);
@@ -53,6 +54,7 @@ struct ipa_client_conn {
        int (*read_cb)(struct ipa_client_conn *link, struct msgb *msg);
        int (*write_cb)(struct ipa_client_conn *link);
        void                            *data;
+       struct msgb                     *pending_msg;
 };
 
 struct ipa_client_conn *ipa_client_conn_create(void *ctx, struct e1inp_ts *ts, 
int priv_nr, const char *addr, uint16_t port, void (*updown)(struct 
ipa_client_conn *link, int), int (*read_cb)(struct ipa_client_conn *link, 
struct msgb *msgb), int (*write_cb)(struct ipa_client_conn *link), void *data);
@@ -64,6 +66,7 @@ void ipa_client_conn_close(struct ipa_client_conn *link);
 void ipa_client_conn_send(struct ipa_client_conn *link, struct msgb *msg);
 
 int ipa_msg_recv(int fd, struct msgb **rmsg);
+int ipa_msg_recv_buffered(int fd, struct msgb **rmsg, struct msgb **tmp_msg);
 
 int ipaccess_rcvmsg_base(struct msgb *msg, struct osmo_fd *bfd);
 
diff --git a/src/input/ipa.c b/src/input/ipa.c
index b5abd36..71e1227 100644
--- a/src/input/ipa.c
+++ b/src/input/ipa.c
@@ -49,50 +49,130 @@ void ipa_msg_push_header(struct msgb *msg, uint8_t proto)
 
 int ipa_msg_recv(int fd, struct msgb **rmsg)
 {
-       struct msgb *msg;
+       int rc = ipa_msg_recv_buffered(fd, rmsg, NULL);
+       if (rc < 0) {
+               errno = -rc;
+               rc = -1;
+       }
+       return rc;
+}
+
+int ipa_msg_recv_buffered(int fd, struct msgb **rmsg, struct msgb **tmp_msg)
+{
+       struct msgb *msg = tmp_msg ? *tmp_msg : NULL;
        struct ipaccess_head *hh;
        int len, ret;
+       int needed;
 
-       msg = ipa_msg_alloc(0);
        if (msg == NULL)
-               return -ENOMEM;
+               msg = ipa_msg_alloc(0);
 
-       /* first read our 3-byte header */
-       hh = (struct ipaccess_head *) msg->data;
-       ret = recv(fd, msg->data, sizeof(*hh), 0);
-       if (ret <= 0) {
-               msgb_free(msg);
-               return ret;
-       } else if (ret != sizeof(*hh)) {
-               LOGP(DLINP, LOGL_ERROR, "too small message received\n");
-               msgb_free(msg);
-               return -EIO;
+       if (msg == NULL) {
+               ret = -ENOMEM;
+               goto discard_msg;
        }
-       msgb_put(msg, ret);
+
+       if (msg->l2h == NULL) {
+               /* first read our 3-byte header */
+               needed = sizeof(*hh) - msg->len;
+               ret = recv(fd, msg->tail, needed, 0);
+               if (ret == 0)
+                      goto discard_msg;
+
+               if (ret < 0) {
+                       if (errno == EAGAIN || errno == EINTR)
+                               ret = 0;
+                       else {
+                               ret = -errno;
+                               goto discard_msg;
+                       }
+               }
+
+               msgb_put(msg, ret);
+
+               if (ret < needed) {
+                       if (msg->len == 0) {
+                               ret = -EAGAIN;
+                               goto discard_msg;
+                       }
+
+                       LOGP(DLINP, LOGL_INFO,
+                            "Received part of IPA message header (%d/%d)\n",
+                            msg->len, sizeof(*hh));
+                       if (!tmp_msg) {
+                               ret = -EIO;
+                               goto discard_msg;
+                       }
+                       *tmp_msg = msg;
+                       return -EAGAIN;
+               }
+
+               msg->l2h = msg->tail;
+       }
+
+       hh = (struct ipaccess_head *) msg->data;
 
        /* then read the length as specified in header */
-       msg->l2h = msg->data + sizeof(*hh);
        len = ntohs(hh->len);
 
        if (len < 0 || IPA_ALLOC_SIZE < len + sizeof(*hh)) {
                LOGP(DLINP, LOGL_ERROR, "bad message length of %d bytes, "
-                                       "received %d bytes\n", len, ret);
-               msgb_free(msg);
-               return -EIO;
+                                       "received %d bytes\n", len, msg->len);
+               ret = -EIO;
+               goto discard_msg;
        }
 
-       ret = recv(fd, msg->l2h, len, 0);
-       if (ret <= 0) {
-               msgb_free(msg);
-               return ret;
-       } else if (ret < len) {
-               LOGP(DLINP, LOGL_ERROR, "truncated message received\n");
-               msgb_free(msg);
-               return -EIO;
+       needed = len - msgb_l2len(msg);
+
+       if (needed > 0) {
+               ret = recv(fd, msg->tail, needed, 0);
+
+               if (ret == 0)
+                       goto discard_msg;
+
+               if (ret < 0) {
+                       if (errno == EAGAIN || errno == EINTR)
+                               ret = 0;
+                       else {
+                               ret = -errno;
+                               goto discard_msg;
+                       }
+               }
+
+               msgb_put(msg, ret);
+
+               if (ret < needed) {
+                       LOGP(DLINP, LOGL_INFO,
+                            "Received part of IPA message L2 data (%d/%d)\n",
+                           msgb_l2len(msg), len);
+                       if (!tmp_msg) {
+                               ret = -EIO;
+                               goto discard_msg;
+                       }
+                       *tmp_msg = msg;
+                       return -EAGAIN;
+               }
        }
-       msgb_put(msg, ret);
+
+       ret = msgb_l2len(msg);
+
+       if (ret == 0) {
+               LOGP(DLINP, LOGL_INFO,
+                    "Discarding IPA message without payload\n");
+               ret = -EAGAIN;
+               goto discard_msg;
+       }
+
+       if (tmp_msg)
+               *tmp_msg = NULL;
        *rmsg = msg;
        return ret;
+
+discard_msg:
+       if (tmp_msg)
+               *tmp_msg = NULL;
+       msgb_free(msg);
+       return ret;
 }
 
 void ipa_client_conn_close(struct ipa_client_conn *link)
@@ -103,6 +183,8 @@ void ipa_client_conn_close(struct ipa_client_conn *link)
                close(link->ofd->fd);
                link->ofd->fd = -1;
        }
+       msgb_free(link->pending_msg);
+       link->pending_msg = NULL;
 }
 
 static void ipa_client_read(struct ipa_client_conn *link)
@@ -113,11 +195,12 @@ static void ipa_client_read(struct ipa_client_conn *link)
 
        LOGP(DLINP, LOGL_DEBUG, "message received\n");
 
-       ret = ipa_msg_recv(ofd->fd, &msg);
+       ret = ipa_msg_recv_buffered(ofd->fd, &msg, &link->pending_msg);
        if (ret < 0) {
-               if (errno == EPIPE || errno == ECONNRESET) {
+               if (ret == -EAGAIN)
+                       return;
+               if (ret == -EPIPE || ret == -ECONNRESET)
                        LOGP(DLINP, LOGL_ERROR, "lost connection with 
server\n");
-               }
                ipa_client_conn_close(link);
                if (link->updown_cb)
                        link->updown_cb(link, 0);
@@ -382,11 +465,12 @@ static void ipa_server_conn_read(struct ipa_server_conn 
*conn)
 
        LOGP(DLINP, LOGL_DEBUG, "message received\n");
 
-       ret = ipa_msg_recv(ofd->fd, &msg);
+       ret = ipa_msg_recv_buffered(ofd->fd, &msg, &conn->pending_msg);
        if (ret < 0) {
-               if (errno == EPIPE || errno == ECONNRESET) {
+               if (ret == -EAGAIN)
+                       return;
+               if (ret == -EPIPE || ret == -ECONNRESET)
                        LOGP(DLINP, LOGL_ERROR, "lost connection with 
server\n");
-               }
                ipa_server_conn_destroy(conn);
                return;
        } else if (ret == 0) {
@@ -471,6 +555,7 @@ ipa_server_conn_create(void *ctx, struct ipa_server_link 
*link, int fd,
 void ipa_server_conn_destroy(struct ipa_server_conn *conn)
 {
        close(conn->ofd.fd);
+       msgb_free(conn->pending_msg);
        osmo_fd_unregister(&conn->ofd);
        if (conn->closed_cb)
                conn->closed_cb(conn);
diff --git a/src/input/ipaccess.c b/src/input/ipaccess.c
index 225d70c..7ac5ad1 100644
--- a/src/input/ipaccess.c
+++ b/src/input/ipaccess.c
@@ -258,6 +258,8 @@ int ipaccess_rcvmsg_bts_base(struct msgb *msg,
 static int ipaccess_drop(struct osmo_fd *bfd, struct e1inp_line *line)
 {
        int ret = 1;
+       unsigned int ts_nr = bfd->priv_nr;
+       struct e1inp_ts *e1i_ts = &line->ts[ts_nr-1];
 
        /* Error case: we did not see any ID_RESP yet for this socket. */
        if (bfd->fd != -1) {
@@ -269,6 +271,9 @@ static int ipaccess_drop(struct osmo_fd *bfd, struct 
e1inp_line *line)
                ret = -ENOENT;
        }
 
+       msgb_free(e1i_ts->pending_msg);
+       e1i_ts->pending_msg = NULL;
+
        /* e1inp_sign_link_destroy releases the socket descriptors for us. */
        line->ops->sign_link_down(line);
 
@@ -415,13 +420,15 @@ static int handle_ts1_read(struct osmo_fd *bfd)
        struct e1inp_ts *e1i_ts = &line->ts[ts_nr-1];
        struct e1inp_sign_link *link;
        struct ipaccess_head *hh;
-       struct msgb *msg;
+       struct msgb *msg = NULL;
        int ret;
 
-       ret = ipa_msg_recv(bfd->fd, &msg);
+       ret = ipa_msg_recv_buffered(bfd->fd, &msg, &e1i_ts->pending_msg);
        if (ret < 0) {
+               if (ret == -EAGAIN)
+                       return 0;
                LOGP(DLINP, LOGL_NOTICE, "Sign link problems, "
-                       "closing socket. Reason: %s\n", strerror(errno));
+                       "closing socket. Reason: %s\n", strerror(-ret));
                goto err;
        } else if (ret == 0) {
                LOGP(DLINP, LOGL_NOTICE, "Sign link vanished, dead socket\n");
diff --git a/tests/ipa_recv/ipa_recv_test.c b/tests/ipa_recv/ipa_recv_test.c
index 7b26259..8cdc7e2 100644
--- a/tests/ipa_recv/ipa_recv_test.c
+++ b/tests/ipa_recv/ipa_recv_test.c
@@ -86,7 +86,7 @@ static void append_ipa_message(struct msgb *msg, int proto, 
const char *text)
                strcpy((char *)l2, text);
 }
 
-static int receive_messages(int fd)
+static int receive_messages(int fd, struct msgb **pending_msg)
 {
        struct msgb *msg;
        char dummy;
@@ -97,13 +97,22 @@ static int receive_messages(int fd)
                        break;
                }
                msg = NULL;
-               rc = ipa_msg_recv(fd, &msg);
-               if (rc == -1)
-                       rc = -errno;
+               rc = ipa_msg_recv_buffered(fd, &msg, pending_msg);
+
                fprintf(stderr,
-                       "ipa_msg_recv: %d, msg %s NULL\n",
-                       rc, msg ? "!=" : "==");
-               if (rc == -EAGAIN)
+                       "ipa_msg_recv_buffered: %d, msg %s NULL, "
+                       "pending_msg %s NULL\n",
+                       rc, msg ? "!=" : "==",
+                       !pending_msg ? "??" : *pending_msg ? "!=" : "==");
+               if (pending_msg && !!msg == !!*pending_msg)
+                       printf( "got msg %s NULL, pending_msg %s NULL, "
+                               "returned: %s\n",
+                               msg ?  "!=" : "==",
+                               *pending_msg ? "!=" : "==",
+                               rc == 0 ? "EOF" :
+                               rc > 0 ? "OK" :
+                               strerror(-rc));
+               else if (!pending_msg && rc == -EAGAIN)
                        printf( "got msg %s NULL, "
                                "returned: %s\n",
                                msg ?  "!=" : "==",
@@ -115,7 +124,8 @@ static int receive_messages(int fd)
                if (rc == -EAGAIN)
                        break;
                if (rc < 0) {
-                       printf("ipa_msg_recv failed with: %s\n", strerror(-rc));
+                       printf("ipa_msg_recv_buffered failed with: %s\n",
+                              strerror(-rc));
                        return rc;
                }
                printf("got IPA message, size=%d, proto=%d, text=\"%s\"\n",
@@ -142,13 +152,15 @@ static int slurp_data(int fd) {
        return count;
 };
 
-static void test_complete_recv(void)
+static void test_complete_recv(int do_not_assemble)
 {
        int sv[2];
        struct msgb *msg_out = msgb_alloc(4096, "msg_out");
+       struct msgb *pending_msg = NULL;
        int rc, i;
 
-       printf("Testing IPA recv with complete messages.\n");
+       printf("Testing IPA recv with complete messages%s.\n",
+              do_not_assemble ? "" : " with assembling enabled");
 
        if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1)
                err(1, "socketpair");
@@ -166,7 +178,11 @@ static void test_complete_recv(void)
        }
 
        for (i=0; i < ARRAY_SIZE(ipa_test_messages); i++) {
-               rc = receive_messages(sv[0]);
+               rc = receive_messages(sv[0],
+                                     do_not_assemble ? NULL : &pending_msg);
+               if (pending_msg)
+                       printf("Unexpected partial message: size=%d\n",
+                              pending_msg->len);
                if (rc == 0)
                        break;
 
@@ -181,16 +197,19 @@ static void test_complete_recv(void)
        close(sv[0]);
 
        msgb_free(msg_out);
+       msgb_free(pending_msg);
 }
 
 
-static void test_partial_recv(void)
+static void test_partial_recv(int do_not_assemble)
 {
        int sv[2];
        struct msgb *msg_out = msgb_alloc(4096, "msg_out");
+       struct msgb *pending_msg = NULL;
        int rc, i;
 
-       printf("Testing IPA recv with partitioned messages.\n");
+       printf("Testing IPA recv with partitioned messages%s.\n",
+              do_not_assemble ? "" : " with assembling enabled");
 
        if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1)
                err(1, "socketpair");
@@ -211,7 +230,8 @@ static void test_partial_recv(void)
                if (msg_out->len == 0)
                        shutdown(sv[1], SHUT_WR);
 
-               rc = receive_messages(sv[0]);
+               rc = receive_messages(sv[0],
+                                     do_not_assemble ? NULL : &pending_msg);
 
                if (rc == 0)
                        break;
@@ -226,6 +246,7 @@ static void test_partial_recv(void)
        close(sv[0]);
 
        msgb_free(msg_out);
+       msgb_free(pending_msg);
 }
 
 static struct log_info info = {};
@@ -239,8 +260,10 @@ int main(int argc, char **argv)
        printf("Testing the IPA layer.\n");
 
        /* run the tests */
-       test_complete_recv();
-       test_partial_recv();
+       test_complete_recv(1);
+       test_partial_recv(1);
+       test_complete_recv(0);
+       test_partial_recv(0);
 
        printf("No crashes.\n");
        return 0;
diff --git a/tests/ipa_recv/ipa_recv_test.ok b/tests/ipa_recv/ipa_recv_test.ok
index 4144d47..bdbfb7d 100644
--- a/tests/ipa_recv/ipa_recv_test.ok
+++ b/tests/ipa_recv/ipa_recv_test.ok
@@ -5,8 +5,31 @@ got IPA message, size=86, proto=200, text="A longer test 
message. ABCDEFGHIJKLMN
 got IPA message, size=16, proto=200, text="Hello again IPA"
 got IPA message, size=1, proto=200, text=""
 got IPA message, size=14, proto=200, text="Next is empty"
-done: unread 14, unsent 0
+got msg == NULL, returned: Resource temporarily unavailable
+got IPA message, size=4, proto=200, text="Bye"
+got IPA message, size=4, proto=200, text="Bye"
+done: unread 0, unsent 0
 Testing IPA recv with partitioned messages.
-ipa_msg_recv failed with: Input/output error
+ipa_msg_recv_buffered failed with: Input/output error
 done: unread 0, unsent 154
+Testing IPA recv with complete messages with assembling enabled.
+got IPA message, size=10, proto=200, text="Hello IPA"
+got IPA message, size=86, proto=200, text="A longer test message. 
ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyz"
+got IPA message, size=16, proto=200, text="Hello again IPA"
+got IPA message, size=1, proto=200, text=""
+got IPA message, size=14, proto=200, text="Next is empty"
+got msg == NULL, pending_msg == NULL, returned: Resource temporarily 
unavailable
+got IPA message, size=4, proto=200, text="Bye"
+got IPA message, size=4, proto=200, text="Bye"
+done: unread 0, unsent 0
+Testing IPA recv with partitioned messages with assembling enabled.
+got IPA message, size=10, proto=200, text="Hello IPA"
+got IPA message, size=86, proto=200, text="A longer test message. 
ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyz"
+got IPA message, size=16, proto=200, text="Hello again IPA"
+got IPA message, size=1, proto=200, text=""
+got IPA message, size=14, proto=200, text="Next is empty"
+got msg == NULL, pending_msg == NULL, returned: Resource temporarily 
unavailable
+got IPA message, size=4, proto=200, text="Bye"
+got IPA message, size=4, proto=200, text="Bye"
+done: unread 0, unsent 0
 No crashes.
-- 
1.7.9.5


Reply via email to