Below is my current work-in-progress, finishing the packet / message
logical separation.  This will permit messages larger than a single
packet, which, in turn, permits GET and PUT messages to be more easily
handled as a single, contiguous message.  DATA messages are eliminated.

This is a major network protocol change.

This work eliminates several problems, most notably (to Pete) the
problem related to PUT message "dropping dup" errors.  This change also
simplifies the code nicely, because the client and server code no longer
has to think about low-level details.

 include/cld_msg.h |   28 +---
 include/cldc.h    |   30 ++--
 lib/cldc.c        |  348 ++++++++++++++++++++++--------------------------------
 server/cld.h      |    9 -
 server/msg.c      |  347 +++++------------------------------------------------
 server/server.c   |   55 ++++++--
 server/session.c  |  112 ++++++++++++-----
 7 files changed, 330 insertions(+), 599 deletions(-)

diff --git a/include/cld_msg.h b/include/cld_msg.h
index e1fd9f1..2fada0f 100644
--- a/include/cld_msg.h
+++ b/include/cld_msg.h
@@ -36,7 +36,10 @@ enum {
        CLD_MAX_USERNAME        = 32,           /**< includes req. nul */
        CLD_MAX_SECRET_KEY      = 128,          /**< includes req. nul */
 
-       CLD_MAX_DATA_MSGS       = 1024,         /**< max data msgs in a stream 
*/
+       CLD_MAX_PKT_MSG_SZ      = 1024,
+       CLD_MAX_PKT_MSG         = 128,
+       CLD_MAX_MSG_SZ          = CLD_MAX_PKT_MSG * 1024, /**< maximum total
+                                             msg size, including all packets */
 };
 
 /*
@@ -53,7 +56,6 @@ enum cld_msg_ops {
        cmo_open                = 2,            /**< open file */
        cmo_get_meta            = 3,            /**< get metadata */
        cmo_get                 = 4,            /**< get metadata + data */
-       cmo_data_s              = 5,            /**< data message to server */
        cmo_put                 = 6,            /**< put data */
        cmo_close               = 7,            /**< close file */
        cmo_del                 = 8,            /**< delete file */
@@ -67,7 +69,6 @@ enum cld_msg_ops {
        cmo_ping                = 30,           /**< server to client ping */
        cmo_not_master          = 31,           /**< I am not the master! */
        cmo_event               = 32,           /**< server->cli async event */
-       cmo_data_c              = 33,           /**< data message to client */
 };
 
 /** CLD error codes */
@@ -118,12 +119,19 @@ enum cld_lock_flags {
        CLF_SHARED              = (1 << 0),     /**< a shared (read) lock */
 };
 
+/** CLD packet flags */
+enum cld_packet_flags {
+       CPF_FIRST               = (1 << 0),     /**< first fragment */
+       CPF_LAST                = (1 << 1),     /**< last fragment */
+};
+
 /** header for each packet */
 struct cld_packet {
        uint8_t         magic[CLD_MAGIC_SZ];    /**< magic number; constant */
        uint64_t        seqid;                  /**< sequence id */
        uint8_t         sid[CLD_SID_SZ];        /**< client id */
-       uint8_t         res[8];
+       uint32_t        flags;                  /**< CPF_xxx flags */
+       uint8_t         res[4];
        char            user[CLD_MAX_USERNAME]; /**< authenticated user */
 };
 
@@ -184,26 +192,14 @@ struct cld_msg_get_resp {
        uint64_t                time_modify;    /**< last modification time */
        uint32_t                flags;          /**< inode flags; CIFL_xxx */
 
-       uint64_t                strid;          /**< DATA stream id */
-
        /* inode name */
 };
 
-/** DATA message */
-struct cld_msg_data {
-       struct cld_msg_hdr      hdr;
-
-       uint64_t                strid;          /**< stream id */
-       uint32_t                seg;            /**< segment number */
-       uint32_t                seg_len;        /**< segment length */
-};
-
 /** PUT message */
 struct cld_msg_put {
        struct cld_msg_hdr      hdr;
 
        uint64_t                fh;             /**< open file handle */
-       uint64_t                strid;          /**< DATA stream id */
        uint32_t                data_size;      /**< total size of data */
 };
 
diff --git a/include/cldc.h b/include/cldc.h
index b98534e..8bc8295 100644
--- a/include/cldc.h
+++ b/include/cldc.h
@@ -20,27 +20,24 @@ struct cldc_call_opts {
        union {
                struct {
                        struct cld_msg_get_resp resp;
-                       char *buf;
+                       const char *buf;
                        unsigned int size;
                        char inode_name[CLD_INODE_NAME_MAX];
                } get;
        } u;
 };
 
-/** internal per-data stream information */
-struct cldc_stream {
-       uint64_t        strid_le;       /**< stream id, LE */
-       uint32_t        size;           /**< total bytes in stream */
-       uint32_t        next_seg;       /**< next segment number expected */
-       void            *bufp;          /**< pointer to next input loc */
-       uint32_t        size_left;      /**< bytes remaining */
-       struct cldc_call_opts copts;    /**< call options */
-       char            buf[0];         /**< the raw data stream bytes */
+struct cldc_pkt_info {
+       int             pkt_len;
+       int             retries;
+
+       /* must be at end of struct */
+       struct cld_packet pkt;
+       uint8_t         data[0];
 };
 
 /** an outgoing message, from client to server */
 struct cldc_msg {
-       uint64_t        seqid;
        uint64_t        xid;
 
        struct cldc_session *sess;
@@ -54,12 +51,12 @@ struct cldc_msg {
 
        time_t          expire_time;
 
-       int             retries;
-
        int             data_len;
+       int             n_pkts;
+
+       struct cldc_pkt_info *pkt_info[CLD_MAX_PKT_MSG];
 
        /* must be at end of struct */
-       struct cld_packet pkt;
        uint8_t         data[0];
 };
 
@@ -102,8 +99,6 @@ struct cldc_session {
        GList           *out_msg;
        time_t          msg_scan_time;
 
-       GList           *streams;
-
        time_t          expire_time;
        bool            expired;
 
@@ -115,6 +110,9 @@ struct cldc_session {
        char            secret_key[CLD_MAX_SECRET_KEY];
 
        bool            confirmed;
+
+       unsigned int    msg_buf_len;
+       char            msg_buf[CLD_MAX_MSG_SZ];
 };
 
 /** Information for a single CLD server host */
diff --git a/lib/cldc.c b/lib/cldc.c
index aba5795..bb5f8eb 100644
--- a/lib/cldc.c
+++ b/lib/cldc.c
@@ -164,78 +164,6 @@ static int cldc_rx_generic(struct cldc_session *sess,
        return ack_seqid(sess, pkt->seqid);
 }
 
-static int cldc_rx_data_c(struct cldc_session *sess,
-                          const struct cld_packet *pkt,
-                          const void *msgbuf,
-                          size_t buflen)
-{
-       const struct cld_msg_data *data = msgbuf;
-       struct cldc_stream *str = NULL;
-       uint32_t seg, seg_len;
-       GList *tmp;
-       const void *p;
-
-       if (buflen < sizeof(*data))
-               return -1008;
-
-       seg = GUINT32_FROM_LE(data->seg);
-       seg_len = GUINT32_FROM_LE(data->seg_len);
-
-       if (buflen < (sizeof(*data) + seg_len))
-               return -1008;
-
-       /* look for stream w/ our strid */
-       tmp = sess->streams;
-       while (tmp) {
-               str = tmp->data;
-               if (str->strid_le == data->strid)
-                       break;
-               tmp = tmp->next;
-       }
-
-       /* if not found, return */
-       if (!tmp)
-               return -1009;
-
-       /* verify segment number is what we expect */
-       if (seg != str->next_seg) {
-               if (sess->verbose)
-                       sess->act_log("rx_data: seg mismatch %u expect %u\n",
-                                     seg, str->next_seg);
-               return -1010;
-       }
-
-       if (seg_len > str->size_left) {
-               if (sess->verbose)
-                       sess->act_log("rx_data: verflow seg_len %u left %u\n",
-                                     seg_len, str->size_left);
-               return -1011;
-       }
-
-       p = data;
-       p += sizeof(*data);
-       memcpy(str->bufp, p, seg_len);
-
-       str->bufp += seg_len;
-       str->size_left -= seg_len;
-       str->next_seg++;
-
-       /* if terminator, process completion */
-       if (seg_len == 0 && str->copts.cb) {
-               if (str->size_left)
-                       sess->act_log("rx_data: size %u short by %u\n",
-                                     str->size, str->size_left);
-               str->copts.u.get.buf = str->buf;
-               str->copts.u.get.size = str->size - str->size_left;
-               str->copts.cb(&str->copts, CLE_OK);
-               sess->streams = g_list_delete_link(sess->streams, tmp);
-               memset(str, 0, sizeof(*str));
-               free(str);
-       }
-
-       return 0;
-}
-
 static int cldc_rx_event(struct cldc_session *sess,
                         const struct cld_packet *pkt,
                         const void *msgbuf,
@@ -274,6 +202,19 @@ static int cldc_rx_not_master(struct cldc_session *sess,
        return -1055;   /* FIXME */
 }
 
+static void cldc_msg_free(struct cldc_msg *msg)
+{
+       int i;
+
+       if (!msg)
+               return;
+       
+       for (i = 0; i < CLD_MAX_PKT_MSG; i++)
+               free(msg->pkt_info[i]);
+       
+       free(msg);
+}
+
 static void sess_expire_outmsg(struct cldc_session *sess, time_t current_time)
 {
        GList *tmp, *tmp1;
@@ -287,7 +228,7 @@ static void sess_expire_outmsg(struct cldc_session *sess, 
time_t current_time)
 
                msg = tmp1->data;
                if (current_time > msg->expire_time) {
-                       free(msg);
+                       cldc_msg_free(msg);
                        sess->out_msg = g_list_delete_link(sess->out_msg, tmp1);
                }
        }
@@ -363,7 +304,6 @@ static const char *opstr(enum cld_msg_ops op)
        case cmo_open:          return "cmo_open";
        case cmo_get_meta:      return "cmo_get_meta";
        case cmo_get:           return "cmo_get";
-       case cmo_data_s:        return "cmo_data_s";
        case cmo_put:           return "cmo_put";
        case cmo_close:         return "cmo_close";
        case cmo_del:           return "cmo_del";
@@ -375,17 +315,17 @@ static const char *opstr(enum cld_msg_ops op)
        case cmo_ping:          return "cmo_ping";
        case cmo_not_master:    return "cmo_not_master";
        case cmo_event:         return "cmo_event";
-       case cmo_data_c:        return "cmo_data_c";
        default:                return "(unknown)";
        }
 }
 
 static int cldc_receive_msg(struct cldc_session *sess,
                            const struct cld_packet *pkt,
-                           size_t pkt_len,
-                           const struct cld_msg_hdr *msg,
-                           size_t msglen)
+                           size_t pkt_len)
 {
+       const struct cld_msg_hdr *msg = (struct cld_msg_hdr *) sess->msg_buf;
+       size_t msglen = sess->msg_buf_len;
+
        switch(msg->op) {
        case cmo_nop:
        case cmo_close:
@@ -397,7 +337,6 @@ static int cldc_receive_msg(struct cldc_session *sess,
        case cmo_new_sess:
        case cmo_end_sess:
        case cmo_open:
-       case cmo_data_s:
        case cmo_get_meta:
        case cmo_get:
                return cldc_rx_generic(sess, pkt, msg, msglen);
@@ -405,8 +344,6 @@ static int cldc_receive_msg(struct cldc_session *sess,
                return cldc_rx_not_master(sess, pkt, msg, msglen);
        case cmo_event:
                return cldc_rx_event(sess, pkt, msg, msglen);
-       case cmo_data_c:
-               return cldc_rx_data_c(sess, pkt, msg, msglen);
        case cmo_ping:
                return ack_seqid(sess, pkt->seqid);
        case cmo_ack:
@@ -427,11 +364,13 @@ int cldc_receive_pkt(struct cldc_session *sess,
        struct timeval tv;
        time_t current_time;
        uint64_t seqid;
+       uint32_t pkt_flags;
+       bool first_frag, last_frag, have_new_sess, have_not_master;
 
        gettimeofday(&tv, NULL);
        current_time = tv.tv_sec;
 
-       if (pkt_len < (sizeof(*pkt) + sizeof(*msg) + SHA_DIGEST_LENGTH)) {
+       if (pkt_len < (sizeof(*pkt) + SHA_DIGEST_LENGTH)) {
                if (sess->verbose)
                        sess->act_log("receive_pkt: msg too short\n");
                return -EPROTO;
@@ -447,16 +386,6 @@ int cldc_receive_pkt(struct cldc_session *sess,
                                (unsigned long long) 
GUINT64_FROM_LE(pkt->seqid),
                                pkt->user,
                                GUINT32_FROM_LE(dp->size));
-               } else if (msg->op == cmo_data_c) {
-                       struct cld_msg_data *dp;
-                       dp = (struct cld_msg_data *) msg;
-                       sess->act_log("receive pkt: len %u, op cmo_data_c"
-                               ", seqid %llu, user %s, seg %u, len %u\n",
-                               (unsigned int) pkt_len,
-                               (unsigned long long) 
GUINT64_FROM_LE(pkt->seqid),
-                               pkt->user,
-                               GUINT32_FROM_LE(dp->seg),
-                               GUINT32_FROM_LE(dp->seg_len));
                } else {
                        sess->act_log("receive pkt: len %u, "
                                "op %s, seqid %llu, user %s\n",
@@ -472,11 +401,6 @@ int cldc_receive_pkt(struct cldc_session *sess,
                        sess->act_log("receive_pkt: bad pkt magic\n");
                return -EPROTO;
        }
-       if (memcmp(msg->magic, CLD_MSG_MAGIC, sizeof(msg->magic))) {
-               if (sess->verbose)
-                       sess->act_log("receive_pkt: bad msg magic\n");
-               return -EPROTO;
-       }
 
        /* check HMAC signature */
        if (!authcheck(sess, pkt, pkt_len)) {
@@ -497,9 +421,33 @@ int cldc_receive_pkt(struct cldc_session *sess,
        if (current_time >= sess->msg_scan_time)
                sess_expire_outmsg(sess, current_time);
 
+       pkt_flags = GUINT32_FROM_LE(pkt->flags);
+       first_frag = pkt_flags & CPF_FIRST;
+       last_frag = pkt_flags & CPF_LAST;
+       have_new_sess = first_frag && (msg->op == cmo_new_sess);
+       have_not_master = first_frag && (msg->op == cmo_not_master);
+
+       if (first_frag)
+               sess->msg_buf_len = 0;
+
+       if ((sess->msg_buf_len + msglen) > CLD_MAX_MSG_SZ) {
+               if (sess->verbose)
+                       sess->act_log("receive_pkt: bad pkt length\n");
+               return -EPROTO;
+       }
+
+       memcpy(sess->msg_buf + sess->msg_buf_len, msg, msglen);
+       sess->msg_buf_len += msglen;
+
+       if (memcmp(msg->magic, CLD_MSG_MAGIC, sizeof(msg->magic))) {
+               if (sess->verbose)
+                       sess->act_log("receive_pkt: bad msg magic\n");
+               return -EPROTO;
+       }
+
        /* verify (or set, for new-sess) sequence id */
        seqid = GUINT64_FROM_LE(pkt->seqid);
-       if (msg->op == cmo_new_sess) {
+       if (have_new_sess) {
                sess->next_seqid_in = seqid + 1;
                sess->next_seqid_in_tr =
                        sess->next_seqid_in - CLDC_MSG_REMEMBER;
@@ -508,7 +456,7 @@ int cldc_receive_pkt(struct cldc_session *sess,
                        sess->act_log("receive_pkt: "
                                         "setting next_seqid_in to %llu\n",
                                         (unsigned long long) seqid);
-       } else if (msg->op != cmo_not_master) {
+       } else if (!have_not_master) {
                if (seqid != sess->next_seqid_in) {
                        if (seqid_in_range(seqid,
                                           sess->next_seqid_in_tr,
@@ -526,7 +474,10 @@ int cldc_receive_pkt(struct cldc_session *sess,
 
        sess->expire_time = current_time + CLDC_SESS_EXPIRE;
 
-       return cldc_receive_msg(sess, pkt, pkt_len, msg, msglen);
+       if (!last_frag)
+               return 0;
+
+       return cldc_receive_msg(sess, pkt, pkt_len);
 }
 
 static void sess_next_seqid(struct cldc_session *sess, uint64_t *seqid)
@@ -543,25 +494,54 @@ static struct cldc_msg *cldc_new_msg(struct cldc_session 
*sess,
        struct cldc_msg *msg;
        struct cld_msg_hdr *hdr;
        struct timeval tv;
+       int i, data_left;
+       void *p;
 
        gettimeofday(&tv, NULL);
 
-       msg = calloc(1, sizeof(*msg) + msg_len + SHA_DIGEST_LENGTH);
+       msg = calloc(1, sizeof(*msg) + msg_len);
        if (!msg)
                return NULL;
 
+       __cld_rand64(&msg->xid);
+
        msg->sess = sess;
+
+       if (copts)
+               memcpy(&msg->copts, copts, sizeof(msg->copts));
+
        msg->expire_time = tv.tv_sec + CLDC_MSG_EXPIRE;
 
-       sess_next_seqid(sess, &msg->seqid);
+       msg->data_len = msg_len;
 
-       __cld_rand64(&msg->xid);
+       msg->n_pkts = msg_len / CLD_MAX_PKT_MSG_SZ;
+       msg->n_pkts += ((msg_len % CLD_MAX_PKT_MSG_SZ) ? 1 : 0);
 
-       msg->data_len = msg_len + SHA_DIGEST_LENGTH;
-       if (copts)
-               memcpy(&msg->copts, copts, sizeof(msg->copts));
+       p = msg->data;
+       data_left = msg_len;
+       for (i = 0; i < msg->n_pkts; i++) {
+               struct cldc_pkt_info *pi;
+               int pkt_len;
+
+               pkt_len = MIN(data_left, CLD_MAX_PKT_MSG_SZ);
+
+               pi = calloc(1, sizeof(*pi) + pkt_len + SHA_DIGEST_LENGTH);
+               if (!pi)
+                       goto err_out;
+
+               pi->pkt_len = pkt_len;
+
+               memcpy(pi->pkt.magic, CLD_PKT_MAGIC, CLD_MAGIC_SZ);
+               memcpy(pi->pkt.sid, sess->sid, CLD_SID_SZ);
+               strncpy(pi->pkt.user, sess->user, CLD_MAX_USERNAME - 1);
+
+               if (i == 0)
+                       pi->pkt.flags |= GUINT32_TO_LE(CPF_FIRST);
+               if (i == (msg->n_pkts - 1))
+                       pi->pkt.flags |= GUINT32_TO_LE(CPF_LAST);
 
-       msg->pkt.seqid = msg->seqid;
+               msg->pkt_info[i] = pi;
+       }
 
        hdr = (struct cld_msg_hdr *) &msg->data[0];
        memcpy(&hdr->magic, CLD_MSG_MAGIC, CLD_MAGIC_SZ);
@@ -569,6 +549,10 @@ static struct cldc_msg *cldc_new_msg(struct cldc_session 
*sess,
        hdr->xid = msg->xid;
 
        return msg;
+
+err_out:
+       cldc_msg_free(msg);
+       return NULL;
 }
 
 static void sess_msg_drop(struct cldc_session *sess)
@@ -583,7 +567,7 @@ static void sess_msg_drop(struct cldc_session *sess)
                if (!msg->done && msg->cb)
                        msg->cb(msg, NULL, 0, false);
 
-               free(msg);
+               cldc_msg_free(msg);
        }
 
        g_list_free(sess->out_msg);
@@ -614,17 +598,28 @@ static int sess_timer(struct cldc_session *sess, void 
*priv)
        }
 
        while (tmp) {
+               int i;
+
                msg = tmp->data;
                tmp = tmp->next;
 
                if (msg->done)
                        continue;
 
-               msg->retries++;
-               sess->ops->pkt_send(sess->private,
-                              sess->addr, sess->addr_len,
-                              &msg->pkt,
-                              sizeof(msg->pkt) + msg->data_len);
+               for (i = 0; i < msg->n_pkts; i++) {
+                       struct cldc_pkt_info *pi;
+                       int total_pkt_len;
+
+                       pi = msg->pkt_info[i];
+                       total_pkt_len = sizeof(struct cld_packet) +
+                                       pi->pkt_len + SHA_DIGEST_LENGTH;
+
+                       pi->retries++;
+
+                       sess->ops->pkt_send(sess->private,
+                                      sess->addr, sess->addr_len,
+                                      &pi->pkt, total_pkt_len);
+               }
        }
 
        sess->ops->timer_ctl(sess->private, true, sess_timer, sess,
@@ -635,46 +630,38 @@ static int sess_timer(struct cldc_session *sess, void 
*priv)
 static int sess_send(struct cldc_session *sess,
                     struct cldc_msg *msg)
 {
-       struct cld_packet *pkt = &msg->pkt;
-
-       memcpy(pkt->magic, CLD_PKT_MAGIC, CLD_MAGIC_SZ);
-       memcpy(pkt->sid, sess->sid, CLD_SID_SZ);
-       strncpy(pkt->user, sess->user, CLD_MAX_USERNAME - 1);
+       int i, data_left;
+       void *p;
 
-       /* sign message */
-       if (!authsign(sess, pkt, sizeof(*pkt) + msg->data_len))
-               return -1;
+       p = msg->data;
+       data_left = msg->data_len;
+       for (i = 0; i < msg->n_pkts; i++) {
+               struct cldc_pkt_info *pi;
+               int total_pkt_len;
 
-       /* add to list of outgoing packets, waiting to be ack'd */
-       sess->out_msg = g_list_append(sess->out_msg, msg);
+               pi = msg->pkt_info[i];
+               memcpy(pi->data, p, pi->pkt_len);
 
-       /* attempt first send */
-       if (sess->ops->pkt_send(sess->private,
-                      sess->addr, sess->addr_len,
-                      pkt, sizeof(*pkt) + msg->data_len) < 0)
-               return -1;
+               total_pkt_len = sizeof(struct cld_packet) +
+                               pi->pkt_len + SHA_DIGEST_LENGTH;
 
-       return 0;
-}
+               sess_next_seqid(sess, &pi->pkt.seqid);
 
-static int sess_stream_open(struct cldc_session *sess,
-                            uint64_t strid_le,
-                            uint32_t size,
-                            const struct cldc_call_opts *copts)
-{
-       struct cldc_stream *str;
+               p += pi->pkt_len;
+               data_left -= pi->pkt_len;
 
-       str = calloc(1, sizeof(*str) + size);
-       if (!str)
-               return -ENOMEM;
+               if (!authsign(sess, &pi->pkt, total_pkt_len))
+                       return -1;
 
-       str->strid_le = strid_le;
-       str->size = size;
-       str->size_left = size;
-       str->bufp = &str->buf[0];
-       memcpy(&str->copts, copts, sizeof(*copts));
+               /* attempt first send */
+               if (sess->ops->pkt_send(sess->private,
+                              sess->addr, sess->addr_len,
+                              &pi->pkt, total_pkt_len) < 0)
+                       return -1;
+       }
 
-       sess->streams = g_list_append(sess->streams, str);
+       /* add to list of outgoing packets, waiting to be ack'd */
+       sess->out_msg = g_list_append(sess->out_msg, msg);
 
        return 0;
 }
@@ -696,13 +683,6 @@ static void sess_free(struct cldc_session *sess)
        }
        g_list_free(sess->out_msg);
 
-       tmp = sess->streams;
-       while (tmp) {
-               free(tmp->data);
-               tmp = tmp->next;
-       }
-       g_list_free(sess->streams);
-
        memset(sess, 0, sizeof(*sess));
        free(sess);
 }
@@ -1082,10 +1062,7 @@ int cldc_put(struct cldc_fh *fh, const struct 
cldc_call_opts *copts,
        struct cldc_session *sess;
        struct cldc_msg *msg;
        struct cld_msg_put *put;
-       struct cldc_msg *datamsg[CLDC_MAX_DATA_PKTS];
-       int n_pkts, i, copy_len;
-       const void *p;
-       size_t data_len_left = data_len;
+       int n_pkts;
 
        if (!data || !data_len || data_len > CLDC_MAX_DATA_SZ)
                return -EINVAL;
@@ -1101,58 +1078,20 @@ int cldc_put(struct cldc_fh *fh, const struct 
cldc_call_opts *copts,
        sess = fh->sess;
 
        /* create PUT message */
-       msg = cldc_new_msg(sess, copts, cmo_put, sizeof(struct cld_msg_put));
+       msg = cldc_new_msg(sess, copts, cmo_put,
+                          sizeof(struct cld_msg_put) + data_len);
        if (!msg)
                return -ENOMEM;
 
        put = (struct cld_msg_put *) msg->data;
        put->fh = fh->fh_le;
-       __cld_rand64(&put->strid);
        put->data_size = GUINT32_TO_LE(data_len);
 
-       memset(datamsg, 0, sizeof(datamsg));
-
-       p = data;
-       for (i = 0; i < n_pkts; i++) {
-               struct cld_msg_data *dm;
-               void *q;
-
-               /* create DATA message for this segment */
-               datamsg[i] = cldc_new_msg(sess, copts, cmo_data_s,
-                                         CLDC_MAX_DATA_PKT_SZ);
-               if (!datamsg[i])
-                       goto err_out;
-
-               if (i == (n_pkts - 1))
-                       datamsg[i]->cb = generic_end_cb;
-
-               dm = (struct cld_msg_data *) datamsg[i]->data;
-               q = dm;
-               q += sizeof(struct cld_msg_data);
-
-               copy_len = MIN(CLDC_MAX_DATA_PKT_SZ, data_len_left);
-               memcpy(q, p, copy_len);
-
-               p += copy_len;
-               data_len_left -= copy_len;
-
-               dm->strid = put->strid;
-               dm->seg = GUINT32_TO_LE(i);
-               dm->seg_len = GUINT32_TO_LE(copy_len);
-       }
+       memcpy((put + 1), data, data_len);
 
        sess_send(sess, msg);
-       for (i = 0; i < n_pkts; i++)
-               sess_send(sess, datamsg[i]);
 
        return 0;
-
-err_out:
-       for (i = 0; i < n_pkts; i++)
-               if (datamsg[i])
-                       free(datamsg[i]);
-       free(msg);
-       return -ENOMEM;
 }
 
 #undef XC32
@@ -1175,8 +1114,11 @@ static ssize_t get_end_cb(struct cldc_msg *msg, const 
void *resp_p,
                resp_rc = GUINT32_FROM_LE(resp->resp.code);
 
        if (resp_rc == CLE_OK) {
+               bool get_body;
+
                o = &msg->copts.u.get.resp;
 
+               get_body = (resp->resp.hdr.op == cmo_get);
                msg->copts.op = cmo_get;
 
                /* copy-and-swap */
@@ -1190,10 +1132,19 @@ static ssize_t get_end_cb(struct cldc_msg *msg, const 
void *resp_p,
 
                /* copy inode name */
                if (o->ino_len <= sizeof(CLD_INODE_NAME_MAX)) {
+                       size_t diffsz;
                        const void *p;
+
                        p = resp;
                        p += sizeof(struct cld_msg_get_resp);
                        memcpy(&msg->copts.u.get.inode_name, p, o->ino_len);
+
+                       p += o->ino_len;
+                       diffsz = p - resp_p;
+
+                       /* point to internal buffer holding GET data */
+                       msg->copts.u.get.buf = msg->sess->msg_buf + diffsz;
+                       msg->copts.u.get.size = msg->sess->msg_buf_len - diffsz;
                } else {
                        o->ino_len = 0;         /* Probably full of garbage */
                }
@@ -1206,9 +1157,6 @@ static ssize_t get_end_cb(struct cldc_msg *msg, const 
void *resp_p,
                return 0;
        }
 
-       sess_stream_open(msg->sess, resp->strid, o->size,
-                        &msg->copts);
-
        return 0;
 }
 #undef XC
diff --git a/server/cld.h b/server/cld.h
index 21f103d..687ab80 100644
--- a/server/cld.h
+++ b/server/cld.h
@@ -70,15 +70,16 @@ struct session {
        uint64_t                next_seqid_in;
        uint64_t                next_seqid_out;
 
-       GList                   *put_q;         /* queued PUT pkts */
-       GList                   *data_q;        /* queued data pkts */
-
        GList                   *out_q;         /* outgoing pkts (to client) */
        struct timer            retry_timer;
 
        char                    user[CLD_MAX_USERNAME];
 
        bool                    ping_open;      /* sent PING, waiting for ack */
+
+       /* huge buffer should always come last */
+       unsigned int            msg_buf_len;
+       char                    msg_buf[CLD_MAX_MSG_SZ];
 };
 
 struct msg_params {
@@ -133,7 +134,6 @@ struct server {
 extern int inode_lock_rescan(DB_TXN *txn, cldino_t inum);
 extern void msg_open(struct msg_params *);
 extern void msg_put(struct msg_params *);
-extern void msg_data(struct msg_params *);
 extern void msg_close(struct msg_params *);
 extern void msg_del(struct msg_params *);
 extern void msg_unlock(struct msg_params *);
@@ -143,7 +143,6 @@ extern void msg_get(struct msg_params *, bool);
 
 /* session.c */
 extern uint64_t next_seqid_le(uint64_t *seq);
-extern void pkt_init_sess(struct cld_packet *dest, struct session *sess);
 extern void pkt_init_pkt(struct cld_packet *dest, const struct cld_packet 
*src);
 extern guint sess_hash(gconstpointer v);
 extern gboolean sess_equal(gconstpointer _a, gconstpointer _b);
diff --git a/server/msg.c b/server/msg.c
index 4f0e583..fcb8927 100644
--- a/server/msg.c
+++ b/server/msg.c
@@ -447,7 +447,8 @@ void msg_get(struct msg_params *mp, bool metadata_only)
 
        name_len = GUINT32_FROM_LE(inode->ino_len);
 
-       resp_len = sizeof(*resp) + name_len + SHA_DIGEST_LENGTH;
+       resp_len = sizeof(*resp) + name_len + SHA_DIGEST_LENGTH +
+                  (metadata_only ? 0 : GUINT32_TO_LE(inode->size));
        resp = alloca(resp_len);
        if (!resp) {
                resp_rc = CLE_OOM;
@@ -463,17 +464,11 @@ void msg_get(struct msg_params *mp, bool metadata_only)
        resp->time_create = inode->time_create;
        resp->time_modify = inode->time_modify;
        resp->flags = inode->flags;
-       resp->strid = rand_strid;
        memcpy(resp+1, inode+1, name_len);
 
-       sess_sendmsg(sess, resp, resp_len, NULL, NULL);
-
-       /* send one or more data packets, if necessary */
+       /* send data, if requested */
        if (!metadata_only) {
-               int i, seg_len;
                void *p;
-               char dbuf[CLD_MAX_UDP_SEG];
-               struct cld_msg_data *dr = (struct cld_msg_data *) &dbuf;
 
                rc = cldb_data_get(txn, inum, &data_mem, &data_mem_len,
                                   false, false);
@@ -484,41 +479,18 @@ void msg_get(struct msg_params *mp, bool metadata_only)
                if (rc == DB_NOTFOUND) {
                        data_mem = NULL;
                        data_mem_len = 0;
-               } else if (rc) {
+               } else if (rc || (data_mem_len != GUINT32_TO_LE(inode->size))) {
                        resp_rc = CLE_DB_ERR;
                        goto err_out;
                }
 
-               /* copy the GET msg's hdr, then change op to DATA */
-               memset(dr, 0, sizeof(*dr));
-               dr->hdr.op = cmo_data_c;
-
-               i = 0;
-               p = data_mem;
-
-               /* break up data_mem into individual packets */
-               while (data_mem_len > 0) {
-                       seg_len = MIN(CLD_MAX_UDP_SEG - sizeof(*dr), 
data_mem_len);
-
-                       dr->strid = rand_strid;
-                       dr->seg = GUINT32_TO_LE(i);
-                       dr->seg_len = GUINT32_TO_LE(seg_len);
-                       memcpy(dbuf + sizeof(*dr), p, seg_len);
-
-                       i++;
-                       p += seg_len;
-                       data_mem_len -= seg_len;
-
-                       sess_sendmsg(sess, dr, seg_len + sizeof(*dr), NULL, 
NULL);
-               }
-
-               /* send terminating packet (seg_len == 0) */
-               dr->strid = rand_strid;
-               dr->seg = GUINT32_TO_LE(i);
-               dr->seg_len = 0;
-               sess_sendmsg(sess, dr, sizeof(*dr), NULL, NULL);
+               p = (resp + 1);
+               p += name_len;
+               memcpy(p, data_mem, data_mem_len);
        }
 
+       sess_sendmsg(sess, resp, resp_len, NULL, NULL);
+
        rc = txn->commit(txn, 0);
        if (rc)
                dbenv->err(dbenv, rc, "msg_get read-only txn commit");
@@ -752,122 +724,33 @@ err_out_noabort:
        free(raw_sess);
 }
 
-static void try_commit_data(struct msg_params *mp,
-                       uint64_t strid, GList *pmsg_ent)
+void msg_put(struct msg_params *mp)
 {
-       struct cld_msg_put *pmsg = pmsg_ent->data;
-       struct cld_msg_data *dmsg;
-       GList *tmp, *tmp1;
-       uint32_t data_size, tmp_size, tmp_seg = 0;
-       int last_seg, nseg, rc, i;
+       const struct cld_msg_put *msg = mp->msg;
+       struct session *sess = mp->sess;
+       uint64_t fh;
        struct raw_handle *h = NULL;
        struct raw_inode *inode = NULL;
-       cldino_t inum;
-       uint64_t fh;
        enum cle_err_codes resp_rc = CLE_OK;
-       void *mem, *p, *q;
-       struct cld_msg_data **darr;
-       struct session *sess = mp->sess;
-       bool have_end_seg = false;
+       const void *mem;
+       int rc;
+       cldino_t inum;
+       uint32_t omode, data_size;
        DB_ENV *dbenv = cld_srv.cldb.env;
        DB_TXN *txn;
 
-       data_size = GUINT32_FROM_LE(pmsg->data_size);
-       tmp_size = 0;
-       last_seg = 0;
-       nseg = 0;
-
-       /*
-        * Pass 1: count total size of all packets in our stream;
-        * count number of segments in stream.
-        */
-       tmp = sess->data_q;
-       while (tmp) {
-               uint32_t tmp_seg_len;
-
-               dmsg = tmp->data;
-               tmp = tmp->next;
-
-               /* non-matching strid[] implies not-our-stream */
-               if (dmsg->strid != strid)
-                       continue;
-
-               tmp_seg = GUINT32_FROM_LE(dmsg->seg);
-               if (tmp_seg >= CLD_MAX_DATA_MSGS)
-                       break;
-               if (tmp_seg > last_seg)
-                       last_seg = tmp_seg;
-
-               tmp_seg_len = GUINT32_FROM_LE(dmsg->seg_len);
-               if (tmp_seg_len == 0)
-                       have_end_seg = true;
-               else
-                       tmp_size += tmp_seg_len;
-               nseg++;
-               if (nseg > CLD_MAX_DATA_MSGS)
-                       break;
-       }
-
-       if (debugging)
-               cldlog(LOG_DEBUG,
-                      "    data scan: end %d nseg %u last %u len %u/%u",
-                      have_end_seg, nseg, last_seg, tmp_size, data_size);
-
-       /* return if data stream not yet 100% received */
-       if (!have_end_seg || tmp_size < data_size)
-               return;         /* nothing to do */
+       /* make sure input data as large as message header */
+       if (mp->msg_len < sizeof(*msg))
+               return;
 
-       /* stream parameter bounds checking */
-       if ((tmp_seg >= CLD_MAX_DATA_MSGS) ||
-           (nseg > CLD_MAX_DATA_MSGS) ||
-           (tmp_size > data_size)) {
-               resp_rc = CLE_DATA_INVAL;
+       /* make sure additional input data as large as expected */
+       data_size = GUINT32_FROM_LE(msg->data_size);
+       if (mp->msg_len != (data_size + sizeof(*msg))) {
+               resp_rc = CLE_BAD_PKT;
                goto err_out_noabort;
        }
 
-       /* create array to store pointers to each data packet */
-       darr = alloca(nseg * sizeof(struct cld_msg_data *));
-       memset(darr, 0, nseg * sizeof(struct cld_msg_data *));
-
-       sess->put_q = g_list_delete_link(sess->put_q, pmsg_ent);
-
-       /*
-        * Pass 2: store packets in array, sorted by segment number
-        */
-       tmp = sess->data_q;
-       while (tmp) {
-               dmsg = tmp->data;
-               tmp1 = tmp;
-               tmp = tmp->next;
-
-               /* non-matching strid[] implies not-our-stream */
-               if (dmsg->strid != strid)
-                       continue;
-
-               /* remove data packet from data msg queue */
-               sess->data_q = g_list_delete_link(sess->data_q, tmp1);
-
-               tmp_seg = GUINT32_FROM_LE(dmsg->seg);
-
-               /* prevent duplicate segment numbers */
-               if (darr[tmp_seg]) {
-                       resp_rc = CLE_DATA_INVAL;
-                       goto err_out_noabort;
-               }
-               darr[tmp_seg] = dmsg;
-       }
-
-       /* final check for missing segments; if segments are missing
-        * at this point, it is a corrupted/malicious data stream,
-        * because it passed other checks following Pass #1
-        */
-       for (i = 0; i < nseg; i++)
-               if (!darr[i]) {
-                       resp_rc = CLE_DATA_INVAL;
-                       goto err_out_noabort;
-               }
-
-       fh = GUINT64_FROM_LE(pmsg->fh);
+       fh = GUINT64_FROM_LE(msg->fh);
 
        rc = dbenv->txn_begin(dbenv, NULL, &txn, 0);
        if (rc) {
@@ -884,6 +767,13 @@ static void try_commit_data(struct msg_params *mp,
        }
 
        inum = cldino_from_le(h->inum);
+       omode = GUINT32_FROM_LE(h->mode);
+
+       if ((!(omode & COM_WRITE)) ||
+           (omode & COM_DIRECTORY)) {
+               resp_rc = CLE_MODE_INVAL;
+               goto err_out;
+       }
 
        /* read inode from db */
        rc = cldb_inode_get(txn, inum, &inode, false, DB_RMW);
@@ -892,26 +782,8 @@ static void try_commit_data(struct msg_params *mp,
                goto err_out;
        }
 
-       /* create contig. memory area sized to contain entire data stream */
-       p = mem = malloc(data_size);
-       if (!mem) {
-               resp_rc = CLE_OOM;
-               goto err_out;
-       }
-
-       /* loop through array, copying each data packet into contig. area */
-       for (i = 0; i <= last_seg; i++) {
-               dmsg = darr[i];
-               q = dmsg;
-
-               tmp_size = GUINT32_FROM_LE(dmsg->seg_len);
-               memcpy(p, q + sizeof(*dmsg), tmp_size);
-               p += tmp_size;
-
-               free(dmsg);
-       }
-
        /* store contig. data area in db */
+       mem = (msg + 1);
        rc = cldb_data_put(txn, inum, mem, data_size, 0);
        if (rc) {
                resp_rc = CLE_DB_ERR;
@@ -934,159 +806,6 @@ static void try_commit_data(struct msg_params *mp,
                goto err_out_noabort;
        }
 
-       resp_ok(sess, mp->msg);
-       free(pmsg);
-       free(h);
-       free(inode);
-       return;
-
-err_out:
-       rc = txn->abort(txn);
-       if (rc)
-               dbenv->err(dbenv, rc, "commit txn abort");
-err_out_noabort:
-       resp_err(sess, mp->msg, resp_rc);
-       free(pmsg);
-       free(h);
-       free(inode);
-}
-
-void msg_data(struct msg_params *mp)
-{
-       const struct cld_msg_data *msg = mp->msg;
-       struct session *sess = mp->sess;
-       GList *tmp;
-       void *mem = NULL;
-       enum cle_err_codes resp_rc = CLE_OK;
-       uint32_t seg_len;
-
-       /* make sure input data as large as expected */
-       if (mp->msg_len < sizeof(*msg))
-               return;
-
-       seg_len = GUINT32_FROM_LE(msg->seg_len);
-
-       if (mp->msg_len < (sizeof(*msg) + seg_len))
-               return;
-
-       if (debugging)
-               cldlog(LOG_DEBUG, "    data strid %016llx",
-                      (unsigned long long) msg->strid);
-
-       /* search for PUT message with strid == our strid; that is how we
-        * associate DATA messages with the initial PUT msg
-        */
-       tmp = sess->put_q;
-       while (tmp) {
-               struct cld_msg_put *pmsg;
-
-               pmsg = tmp->data;
-               if (pmsg->strid == msg->strid)
-                       break;
-
-               tmp = tmp->next;
-       }
-
-       if (!tmp) {
-               resp_rc = CLE_DATA_INVAL;
-               goto err_out;
-       }
-
-       /* copy DATA msg */
-       mem = malloc(mp->msg_len);
-       if (!mem) {
-               resp_rc = CLE_OOM;
-               goto err_out;
-       }
-
-       memcpy(mem, mp->msg, mp->msg_len);
-
-       /* store DATA message on DATA msg queue */
-       sess->data_q = g_list_append(sess->data_q, mem);
-
-       resp_ok(sess, mp->msg);
-
-       /* scan DATA queue for completed stream; commit to db, if found */
-       try_commit_data(mp, msg->strid, tmp);
-       return;
-
-err_out:
-       resp_err(sess, mp->msg, resp_rc);
-}
-
-void msg_put(struct msg_params *mp)
-{
-       const struct cld_msg_put *msg = mp->msg;
-       struct session *sess = mp->sess;
-       uint64_t fh;
-       struct raw_handle *h = NULL;
-       struct raw_inode *inode = NULL;
-       enum cle_err_codes resp_rc = CLE_OK;
-       void *mem;
-       int rc;
-       cldino_t inum;
-       uint32_t omode;
-       DB_ENV *dbenv = cld_srv.cldb.env;
-       DB_TXN *txn;
-
-       /* make sure input data as large as expected */
-       if (mp->msg_len < sizeof(*msg))
-               return;
-
-       fh = GUINT64_FROM_LE(msg->fh);
-
-       rc = dbenv->txn_begin(dbenv, NULL, &txn, 0);
-       if (rc) {
-               dbenv->err(dbenv, rc, "DB_ENV->txn_begin");
-               resp_rc = CLE_DB_ERR;
-               goto err_out_noabort;
-       }
-
-       /* read handle from db, for validation */
-       rc = cldb_handle_get(txn, sess->sid, fh, &h, 0);
-       if (rc) {
-               resp_rc = CLE_FH_INVAL;
-               goto err_out;
-       }
-
-       inum = cldino_from_le(h->inum);
-       omode = GUINT32_FROM_LE(h->mode);
-
-       if ((!(omode & COM_WRITE)) ||
-           (omode & COM_DIRECTORY)) {
-               resp_rc = CLE_MODE_INVAL;
-               goto err_out;
-       }
-
-       /* read inode from db, for validation */
-       rc = cldb_inode_get(txn, inum, &inode, false, 0);
-       if (rc) {
-               resp_rc = CLE_INODE_INVAL;
-               goto err_out;
-       }
-
-       rc = txn->commit(txn, 0);
-       if (rc)
-               dbenv->err(dbenv, rc, "msg_put read-only txn commit");
-
-       /* copy message */
-       mem = malloc(mp->msg_len);
-       if (!mem) {
-               resp_rc = CLE_OOM;
-               goto err_out;
-       }
-
-       memcpy(mem, msg, mp->msg_len);
-
-       /* store PUT message in PUT msg queue */
-       sess->put_q = g_list_append(sess->put_q, mem);
-
-       /*
-        * In all other cases we ack here, but in put we do it in
-        * try_to_commit_data, so that we can report the result of commit.
-        */
-       // resp_ok(sess, mp->msg);
-
        free(h);
        free(inode);
        return;
diff --git a/server/server.c b/server/server.c
index b6e1332..9f4719d 100644
--- a/server/server.c
+++ b/server/server.c
@@ -217,7 +217,6 @@ const char *opstr(enum cld_msg_ops op)
        case cmo_open:          return "cmo_open";
        case cmo_get_meta:      return "cmo_get_meta";
        case cmo_get:           return "cmo_get";
-       case cmo_data_s:        return "cmo_data_s";
        case cmo_put:           return "cmo_put";
        case cmo_close:         return "cmo_close";
        case cmo_del:           return "cmo_del";
@@ -229,7 +228,6 @@ const char *opstr(enum cld_msg_ops op)
        case cmo_ping:          return "cmo_ping";
        case cmo_not_master:    return "cmo_not_master";
        case cmo_event:         return "cmo_event";
-       case cmo_data_c:        return "cmo_data_c";
        default:                return "(unknown)";
        }
 }
@@ -250,7 +248,6 @@ static void udp_rx_msg(const struct client *cli, const 
struct cld_packet *pkt,
        case cmo_get:           msg_get(mp, false); break;
        case cmo_get_meta:      msg_get(mp, true); break;
        case cmo_put:           msg_put(mp); break;
-       case cmo_data_s:        msg_data(mp); break;
        case cmo_close:         msg_close(mp); break;
        case cmo_del:           msg_del(mp); break;
        case cmo_unlock:        msg_unlock(mp); break;
@@ -276,6 +273,8 @@ static void udp_rx(struct server_socket *sock,
        struct cld_msg_resp *resp;
        struct msg_params mp;
        size_t alloc_len;
+       uint32_t pkt_flags;
+       bool first_frag, have_new_sess, have_ack, have_put;
 
        /* drop all completely corrupted packets */
        if ((pkt_len < (sizeof(*pkt) + sizeof(*msg) + SHA_DIGEST_LENGTH)) ||
@@ -289,6 +288,12 @@ static void udp_rx(struct server_socket *sock,
                goto err_out;
        }
 
+       pkt_flags = GUINT32_FROM_LE(pkt->flags);
+       first_frag = pkt_flags & CPF_FIRST;
+       have_new_sess = first_frag && (msg->op == cmo_new_sess);
+       have_ack = first_frag && (msg->op == cmo_ack);
+       have_put = first_frag && (msg->op == cmo_put);
+
        /* look up client session, verify it matches IP and username */
        sess = g_hash_table_lookup(cld_srv.sessions, pkt->sid);
        if (sess &&
@@ -307,28 +312,24 @@ static void udp_rx(struct server_socket *sock,
        mp.msg_len = pkt_len - sizeof(*pkt);
 
        if (debugging) {
-               if (msg->op == cmo_data_s) {
-                       struct cld_msg_data *dp = (struct cld_msg_data *) msg;
-                       cldlog(LOG_DEBUG,
-                              "    msg op %s, seqid %llu, seg %u, len %u",
-                              opstr(msg->op),
-                              (unsigned long long) GUINT64_FROM_LE(pkt->seqid),
-                              GUINT32_FROM_LE(dp->seg),
-                              GUINT32_FROM_LE(dp->seg_len));
-               } else if (msg->op == cmo_put) {
+               if (have_put) {
                        struct cld_msg_put *dp = (struct cld_msg_put *) msg;
                        cldlog(LOG_DEBUG, "    msg op %s, seqid %llu, size %u",
                               opstr(msg->op),
                               (unsigned long long) GUINT64_FROM_LE(pkt->seqid),
                               GUINT32_FROM_LE(dp->data_size));
-               } else {
+               } else if (first_frag) {
                        cldlog(LOG_DEBUG, "    msg op %s, seqid %llu",
                               opstr(msg->op),
                               (unsigned long long) 
GUINT64_FROM_LE(pkt->seqid));
+               } else {
+                       cldlog(LOG_DEBUG, "    seqid %llu",
+                              (unsigned long long) 
GUINT64_FROM_LE(pkt->seqid));
                }
        }
 
-       if (msg->op != cmo_new_sess) {
+       /* advance sequence id's and update last-contact timestamp */
+       if (!have_new_sess) {
                if (!sess) {
                        resp_rc = CLE_SESS_INVAL;
                        goto err_out;
@@ -337,7 +338,7 @@ static void udp_rx(struct server_socket *sock,
                sess->last_contact = current_time.tv_sec;
                sess->sock = sock;      /* FIXME refcount for changed sockets */
 
-               if (msg->op != cmo_ack) {
+               if (!have_ack) {
                        /* eliminate duplicates; do not return any response */
                        if (GUINT64_FROM_LE(pkt->seqid) != sess->next_seqid_in) 
{
                                if (debugging)
@@ -362,6 +363,30 @@ static void udp_rx(struct server_socket *sock,
                }
        }
 
+       /* copy message fragment into reassembly buffer */
+       if (sess) {
+               if (pkt_flags & CPF_FIRST)
+                       sess->msg_buf_len = 0;
+
+               if ((sess->msg_buf_len + mp.msg_len) > CLD_MAX_MSG_SZ) {
+                       resp_rc = CLE_BAD_PKT;
+                       goto err_out;
+               }
+
+               memcpy(&sess->msg_buf[sess->msg_buf_len], msg, mp.msg_len);
+               sess->msg_buf_len += mp.msg_len;
+
+               if (!(pkt_flags & CPF_LAST))
+                       return;
+
+               mp.msg = msg = (struct cld_msg_hdr *) sess->msg_buf;
+               mp.msg_len = sess->msg_buf_len;
+
+               if (debugging)
+                       cldlog(LOG_DEBUG, "    final message size %u",
+                              sess->msg_buf_len);
+       }
+
        udp_rx_msg(cli, pkt, msg, &mp);
        return;
 
diff --git a/server/session.c b/server/session.c
index be9bb1e..8f858cc 100644
--- a/server/session.c
+++ b/server/session.c
@@ -67,15 +67,17 @@ void pkt_init_pkt(struct cld_packet *dest, const struct 
cld_packet *src)
        memcpy(dest->magic, CLD_PKT_MAGIC, CLD_MAGIC_SZ);
        dest->seqid = GUINT64_TO_LE(0xdeadbeef);
        memcpy(dest->sid, src->sid, CLD_SID_SZ);
+       dest->flags = GUINT32_TO_LE(CPF_FIRST | CPF_LAST);
        strncpy(dest->user, src->user, CLD_MAX_USERNAME - 1);
 }
 
-void pkt_init_sess(struct cld_packet *dest, struct session *sess)
+static void pkt_init_sess(struct cld_packet *dest, struct session *sess)
 {
        memset(dest, 0, sizeof(*dest));
        memcpy(dest->magic, CLD_PKT_MAGIC, CLD_MAGIC_SZ);
        dest->seqid = next_seqid_le(&sess->next_seqid_out);
        memcpy(dest->sid, sess->sid, CLD_SID_SZ);
+       dest->flags = 0;
        strncpy(dest->user, sess->user, CLD_MAX_USERNAME - 1);
 }
 
@@ -556,14 +558,29 @@ static void session_retry(struct timer *timer)
        timer_add(&sess->retry_timer, time(NULL) + CLD_RETRY_START);
 }
 
+static void session_outq(struct session *sess, GList *new_pkts)
+{
+       /* if out_q empty, start retry timer */
+       if (!sess->out_q)
+               timer_add(&sess->retry_timer, time(NULL) + CLD_RETRY_START);
+
+       sess->out_q = g_list_concat(sess->out_q, new_pkts);
+}
+
 bool sess_sendmsg(struct session *sess, const void *msg_, size_t msglen,
                  void (*done_cb)(struct session_outpkt *),
                  void *done_data)
 {
        struct cld_packet *outpkt;
-       struct cld_msg_hdr *msg;
-       struct session_outpkt *op;
-       size_t pkt_len;
+       unsigned int n_pkts, i;
+       size_t pkt_len, msg_left = msglen;
+       struct session_outpkt *pkts[CLD_MAX_PKT_MSG], *op;
+       GList *tmp_root = NULL;
+       const void *p;
+       bool first_frag = true;
+
+       n_pkts = (msglen / CLD_MAX_PKT_MSG_SZ);
+       n_pkts += (msglen % CLD_MAX_PKT_MSG_SZ) ? 1 : 0;
 
        if (debugging) {
                const struct cld_msg_hdr *hdr = msg_;
@@ -581,7 +598,6 @@ bool sess_sendmsg(struct session *sess, const void *msg_, 
size_t msglen,
                case cmo_new_sess:
                case cmo_end_sess:
                case cmo_open:
-               case cmo_data_s:
                case cmo_get_meta:
                case cmo_get:
                        rsp = (struct cld_msg_resp *) msg_;
@@ -601,48 +617,78 @@ bool sess_sendmsg(struct session *sess, const void *msg_, 
size_t msglen,
                }
        }
 
-       op = op_alloc(sizeof(*outpkt) + msglen + SHA_DIGEST_LENGTH);
-       if (!op)
+       if (n_pkts > CLD_MAX_PKT_MSG)
                return false;
 
-       op->sess = sess;
-       op->done_cb = done_cb;
-       op->done_data = done_data;
+       /* pass 1: perform allocations */
+       for (i = 0; i < n_pkts; i++) {
+               pkts[i] = op = op_alloc(sizeof(*outpkt) +
+                                       CLD_MAX_PKT_MSG_SZ +
+                                       SHA_DIGEST_LENGTH);
+               if (!op)
+                       goto err_out;
 
-       outpkt = op->pkt;
-       pkt_len = op->pkt_len;
+               tmp_root = g_list_append(tmp_root, op);
+       }
 
-       msg = (struct cld_msg_hdr *) (outpkt + 1);
+       /* pass 2: fill packets */
+       p = msg_;
+       for (i = 0; i < n_pkts; i++) {
+               struct cld_msg_hdr *outmsg;
+               void *outmsg_mem;
+               size_t copy_len;
 
-       /* init packet header */
-       pkt_init_sess(outpkt, sess);
+               op = pkts[i];
 
-       /* init message header */
-       memcpy(msg->magic, CLD_MSG_MAGIC, CLD_MAGIC_SZ);
-       msg->op = ((struct cld_msg_hdr *)msg_)->op;
+               op->sess = sess;
+       
+               outpkt = op->pkt;
+               pkt_len = op->pkt_len;
 
-       /* copy message trailer */
-       memcpy(msg + 1, msg_ + sizeof(*msg), msglen - sizeof(*msg));
+               outmsg_mem = (outpkt + 1);
+               outmsg = outmsg_mem;
 
-       op->pkt = outpkt;
-       op->pkt_len = pkt_len;
-       op->next_retry = current_time.tv_sec + CLD_RETRY_START;
+               /* init packet header */
+               pkt_init_sess(outpkt, sess);
 
-       if (!authsign(outpkt, pkt_len)) {
-               op_unref(op);
-               return false;
-       }
+               if (first_frag) {
+                       first_frag = false;
+                       outpkt->flags |= GUINT32_TO_LE(CPF_FIRST);
+               }
 
-       /* if out_q empty, start retry timer */
-       if (!sess->out_q)
-               timer_add(&sess->retry_timer, time(NULL) + CLD_RETRY_START);
+               copy_len = MIN(pkt_len - sizeof(*outpkt), msg_left);
+               memcpy(outmsg_mem, p, copy_len);
+
+               p += copy_len;
+               msg_left -= copy_len;
+
+               op->pkt_len = pkt_len = copy_len + sizeof(*outpkt);
 
-       sess->out_q = g_list_append(sess->out_q, op);
+               if (!msg_left) {
+                       op->done_cb = done_cb;
+                       op->done_data = done_data;
 
-       udp_tx(sess->sock, (struct sockaddr *) &sess->addr,
-              sess->addr_len, outpkt, pkt_len);
+                       outpkt->flags |= GUINT32_TO_LE(CPF_LAST);
+               }
+
+               op->next_retry = current_time.tv_sec + CLD_RETRY_START;
+
+               if (!authsign(outpkt, pkt_len))
+                       goto err_out;   /* FIXME: we free all pkts -- wrong! */
+
+               udp_tx(sess->sock, (struct sockaddr *) &sess->addr,
+                      sess->addr_len, outpkt, pkt_len);
+       }
+
+       session_outq(sess, tmp_root);
 
        return true;
+
+err_out:
+       for (i = 0; i < n_pkts; i++)
+               op_unref(pkts[i]);
+       g_list_free(tmp_root);
+       return false;
 }
 
 void msg_ack(struct msg_params *mp)
--
To unsubscribe from this list: send the line "unsubscribe hail-devel" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Reply via email to