RPM Package Manager, CVS Repository
  http://rpm5.org/cvs/
  ____________________________________________________________________________

  Server: rpm5.org                         Name:   Jeff Johnson
  Root:   /v/rpm/cvs                       Email:  [email protected]
  Module: rpm                              Date:   28-May-2017 20:01:53
  Branch: rpm-5_4                          Handle: 2017052818015201

  Modified files:           (Branch: rpm-5_4)
    rpm                     CHANGES
    rpm/rpmio               msqio.c poptIO.c rpmio.c rpmmalloc.c rpmmsq.h
                            tmq.c

  Log:
    - msqio: replace pthreads monitor with yarn.

  Summary:
    Revision    Changes     Path
    1.3501.2.564+1  -0      rpm/CHANGES
    1.1.2.15    +422 -194   rpm/rpmio/msqio.c
    1.94.2.31   +6  -4      rpm/rpmio/poptIO.c
    1.230.2.53  +1  -3      rpm/rpmio/rpmio.c
    1.29.2.8    +1  -1      rpm/rpmio/rpmmalloc.c
    1.1.2.12    +66 -10     rpm/rpmio/rpmmsq.h
    1.1.2.11    +65 -51     rpm/rpmio/tmq.c
  ____________________________________________________________________________

  patch -p0 <<'@@ .'
  Index: rpm/CHANGES
  ============================================================================
  $ cvs diff -u -r1.3501.2.563 -r1.3501.2.564 CHANGES
  --- rpm/CHANGES       27 May 2017 14:22:17 -0000      1.3501.2.563
  +++ rpm/CHANGES       28 May 2017 18:01:52 -0000      1.3501.2.564
  @@ -1,4 +1,5 @@
   5.4.17 -> 5.4.18:
  +    - jbj: msqio: replace pthreads monitor with yarn.
       - jbj: rpmio: wire up the fooInit callback, pass the message to fooDbug.
       - jbj: rpmio: stub in fooDbug and fooInit callbacks for pools.
       - jbj: msqio: add a monitor to handle multiple readers/writers.
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/msqio.c
  ============================================================================
  $ cvs diff -u -r1.1.2.14 -r1.1.2.15 msqio.c
  --- rpm/rpmio/msqio.c 27 May 2017 14:22:18 -0000      1.1.2.14
  +++ rpm/rpmio/msqio.c 28 May 2017 18:01:52 -0000      1.1.2.15
  @@ -22,6 +22,7 @@
   #include <rpmmacro.h>
   #include <rpmcb.h>           /* XXX rpmIsDebug() */
   #include <rpmzlog.h>         /* XXX rpmzLog type */
  +#include <yarn.h>
   
   #define      _RPMMSQ_INTERNAL
   #include "rpmmsq.h"
  @@ -39,50 +40,31 @@
   
   #define      MSQONLY(fd)     assert(fdGetIo(fd) == msqio)
   
  -static int _lockdebug = 0;
  -static int _conddebug = -1;
   #define Z(_rc)  assert((_rc) == 0)
  -#define LOCK(_m)  \
  -    {        if (_lockdebug) \
  -         rpmzLogAdd(msq->zlog, "***  LOCKING(%p) %s", &_m, #_m); \
  -     Z(pthread_mutex_lock(&_m)); \
  -    }
  -#define UNLOCK(_m) \
  -    {        Z(pthread_mutex_unlock(&_m)); \
  -     if (_lockdebug) \
  -         rpmzLogAdd(msq->zlog, "*** UNLOCKED(%p) %s", &_m, #_m); \
  -    }
  -#define SIGNAL(_c) \
  -    {        if (_conddebug) \
  -         rpmzLogAdd(msq->zlog, "***   SIGNAL(%p) %s", &_c, #_c); \
  -     Z(pthread_cond_broadcast(&_c)); \
  -    }
  -#define WAIT(_c,_m) \
  -    {        if (_conddebug) \
  -         rpmzLogAdd(msq->zlog, "***     WAIT(%p) %s", &_c, #_c); \
  -     Z(pthread_cond_wait(&_c, &_m)); \
  -    }
   
  -#ifdef       NOTYET
  -static int _yarndebug = -1;
  +static int _lockdebug = 0;
  +static int _conddebug = -1;
   #define PEEK(_bolt)  yarnPeekLock(_bolt)
   #define      POSSESS(_bolt) \
       {        yarnPossess(_bolt); \
  -     if (_yarndebug) fprintf(stderr, "***  POSSESS(%p)\t\t%ld\n", _bolt, 
PEEK(_bolt));\
  +     if (_lockdebug) \
  +         rpmzLogAdd(msq->zlog, "***\t  POSSESS(%p)\t%ld", _bolt, 
PEEK(_bolt)); \
       }
   #define      RELEASE(_bolt) \
  -    {        if (_yarndebug) fprintf(stderr, "***  RELEASE(%p)\t\t%ld\n", 
_bolt, PEEK(_bolt));\
  +    {        if (_lockdebug) \
  +         rpmzLogAdd(msq->zlog, "***\t  RELEASE(%p)\t%ld", _bolt, 
PEEK(_bolt)); \
        yarnRelease(_bolt); \
       }
   #define      TWIST(_bolt, _op, _val) \
  -    {        if (_yarndebug) fprintf(stderr, "***    TWIST(%p, %d, 
%d)\t%ld\n", _bolt, _op, _val, PEEK(_bolt));\
  +    {        if (_conddebug) \
  +         rpmzlogAdd(msq->zlog, "***\t    TWIST(%p, %d, %d)\t%ld", _bolt, 
_op, _val, PEEK(_bolt)); \
        yarnTwist(_bolt, _op, _val); \
       }
   #define      WAITFOR(_bolt, _op, _val) \
  -    {        if (_yarndebug) fprintf(stderr, "***  WAITFOR(%p, %d, 
%d)\t%ld\n", _bolt, _op, _val, PEEK(_bolt));\
  +    {        if (_conddebug) \
  +         rpmzLogAdd(msq->zlog, "***\t  WAITFOR(%p, %d, %d)\t%ld", _bolt, 
_op, _val, PEEK(_bolt));\
        yarnWaitFor(_bolt, _op, _val); \
       }
  -#endif
   
   #ifdef __cplusplus
   GENfree(rpmmsq)
  @@ -347,66 +329,142 @@
        PRINT(d, msgsize);
   #endif       /* WITH_MSQ */
        if (stats) {
  -         LOCK(msq->m);
  -         PRINT(d, inflight);
  +         PRINT(d, nqueued);
            PRINT(d, nsent);
            PRINT(d, nrecv);
            PRINT(d, ntimeout);
            PRINT(d, nagain);
  -         UNLOCK(msq->m);
        }
  +     PRINT(p, head);
  +     PRINT(p, tail);
  +     PRINT(d, nqueued);
   #undef       PRINT
       }
   }
   
   /* =============================================================== */
  -static void rpmmsqFini(void *_msq)
  +int _rpmaio_debug = 0;
  +
  +static char *LIO_[] = { "READ", "WRITE", "NOP", "DSYNC", "SYNC", "CLOSE" };
  +static char * rpmaioDbug(void *_aio, char *b, size_t nb)
   {
  -    rpmmsq msq = (rpmmsq) _msq;
  +    rpmaio aio = (rpmaio) _aio;
  +    size_t len = strlen(b);
  +    char * be = b + len;
  +    rpmioItem item = (rpmioItem) aio;
  +    long use;
   
  -    if (msq) {
  -SPEW("%s: inflight %d:%d sent %d recv %d timeout %d again %d\n", 
__FUNCTION__, msq->inflight, msq->msgmax, msq->nsent, msq->nrecv, 
msq->ntimeout, msq->nagain);
  -     msq->msgmax = MSQ_MSGMAX;
  -     msq->msgsize = MSQ_MSGSIZE;
  +    if (aio && (use = PEEK(item->use)) > 1) {
  +     int colorize = isatty(fileno(stderr));
  +#define ANSI_BRIGHT_BLUE     "\x1b[34;1m"
  +#define ANSI_RESET           "\x1b[0m"
  +     *be++ = '\n';
  +     if (colorize) be = stpcpy(be, ANSI_BRIGHT_BLUE);
  +     be += sprintf(be, "========================== aio(%p) use %ld ix %d\n",
  +             aio, use, aio->ix);
  +#define PRINT_AIO(_fmt, _foo) \
  +    {        be += sprintf(be, "%25s: %"#_fmt"\n", #_foo, aio->cb._foo); }
  +#define PRINT_AIO_SIGEV(_fmt, _foo) \
  +    {        be += sprintf(be, "%25s: %"#_fmt"\n", #_foo, 
aio->cb.aio_sigevent._foo); }
  +     PRINT_AIO(d, aio_fildes);
  +     be += sprintf(be, "%25s: %s\n", "aio_lio_opcode",
  +             LIO_[aio->cb.aio_lio_opcode % (sizeof(LIO_)/sizeof(LIO_[0]))]);
  +     if (aio->cb.aio_reqprio)
  +         PRINT_AIO(d, aio_reqprio);
  +     PRINT_AIO(p, aio_buf);
  +     PRINT_AIO(zd, aio_nbytes);
  +     switch (aio->cb.aio_sigevent.sigev_notify) {
  +     default:
  +         break;
  +     case SIGEV_THREAD:
  +         PRINT_AIO_SIGEV(p, sigev_value.sival_ptr);
  +         PRINT_AIO_SIGEV(d, sigev_signo);
  +         PRINT_AIO_SIGEV(d, sigev_notify);
  +         PRINT_AIO_SIGEV(p, sigev_notify_function);
  +         PRINT_AIO_SIGEV(p, sigev_notify_attributes);
  +         break;
  +     }
  +#undef       PRINT_AIO_SIGEV
  +#undef       PRINT_AIO
  +     be--;
  +     if (colorize) be = stpcpy(be, ANSI_RESET);
  +     *be = '\0';
  +    }
  +    return b;
  +}
   
  -     Z(pthread_mutex_destroy(&msq->m));
  -     Z(pthread_cond_destroy(&msq->e));
  -     Z(pthread_cond_destroy(&msq->f));
  -     msq->inflight = 0;
  -     msq->nsent = 0;
  -     msq->nrecv = 0;
  -     msq->ntimeout = 0;
  -     msq->nagain = 0;
  +static void rpmaioInit(void *_aio)
  +{
  +    rpmaio aio = (rpmaio) _aio;
   
  -     msq->flags = 0;
  -     msq->qname = _free(msq->qname);
  -     msq->fmode = _free(msq->fmode);
  -     msq->fdno = -1;
  -     msq->qid = -1;
  -     msq->oflags = 0;
  -     msq->omode = 0;
  -     msq->key = 0;
  -     msq->mtype = 0;
  +    if (aio) {
  +     aio->cb.aio_fildes = 0;
  +     aio->cb.aio_lio_opcode = 0;
  +     aio->cb.aio_reqprio = 0;
  +     aio->cb.aio_buf = NULL;
  +     aio->cb.aio_nbytes = 0;
  +     aio->ix = -1;
  +    }
  +}
   
  -     if (msq->zlog)
  -         msq->zlog = rpmzLogDump(msq->zlog, NULL);
  +static void rpmaioFini(void *_aio)
  +{
  +    rpmaio aio = (rpmaio) _aio;
  +
  +    if (aio) {
  +     aio->cb.aio_fildes = 0;
  +     aio->cb.aio_lio_opcode = 0;
  +     aio->cb.aio_reqprio = 0;
  +     aio->cb.aio_buf = NULL;
  +     aio->cb.aio_nbytes = 0;
  +     aio->ix = -1;
       }
   }
   
  +RPMIOPOOL_MODULE(aio)
  +
  +rpmaio rpmaioNew(int fdno, int op, int prio, void *b, size_t nb)
  +{
  +    rpmaio aio = rpmaioGetPool(_rpmaioPool);
  +
  +    aio->_item.next = NULL;          /* XXX rpmmalloc.c ? */
  +    aio->cb.aio_fildes = fdno;
  +    aio->cb.aio_lio_opcode = op;
  +    aio->cb.aio_reqprio = prio;
  +    aio->cb.aio_buf = b;
  +    aio->cb.aio_nbytes = nb;
  +
  +    aio->cb.__error_code = EINPROGRESS;      /* XXX */
  +    aio->cb.__return_value = 0;              /* XXX */
  +
  +    aio->ix = -1;
  +
  +    return rpmaioLink(aio);
  +}
  +
  +/* =============================================================== */
   static char * rpmmsqDbug(void *_msq, char *b, size_t nb)
   {
       rpmmsq msq = (rpmmsq) _msq;
       size_t len = strlen(b);
       char * be = b + len;
  +    rpmioItem item = (rpmioItem) msq;
  +    long use;
   
  -    if (msq) {
  +    if (msq && (use = PEEK(item->use)) > 0) {
        int colorize = isatty(fileno(stderr));
   #define ANSI_BRIGHT_BLUE     "\x1b[34;1m"
   #define ANSI_RESET           "\x1b[0m"
        *be++ = '\n';
        if (colorize) be = stpcpy(be, ANSI_BRIGHT_BLUE);
  +     be += sprintf(be, "========================== msq(%p) use %ld\n",
  +             msq, use);
   #define PRINT(_fmt, _foo) \
  -    {        be += sprintf(be, "%19s: %"#_fmt"\n", #_foo, msq->_foo); }
  +    {        be += sprintf(be, "%25s: %"#_fmt"\n", #_foo, msq->_foo); }
  +#define PRINT_ATTRS(_fmt, _foo) \
  +    {        be += sprintf(be, "%25s: %"#_fmt"\n", #_foo, msq->oattrs._foo); 
}
  +#define PRINT_SIGEV(_fmt, _foo) \
  +    {        be += sprintf(be, "%25s: %"#_fmt"\n", #_foo, msq->sigev._foo); }
        PRINT(s, qname);
        PRINT(s, fmode);
        PRINT(d, fdno);
  @@ -418,13 +476,32 @@
        PRINT(ld, mtype);
        PRINT(d, msgmax);
        PRINT(d, msgsize);
  -     LOCK(msq->m);
  -     PRINT(d, inflight);
  +     PRINT(d, nqueued);
        PRINT(d, nsent);
        PRINT(d, nrecv);
        PRINT(d, ntimeout);
        PRINT(d, nagain);
  -     UNLOCK(msq->m);
  +     PRINT_ATTRS(ld, mq_flags);
  +     PRINT_ATTRS(ld, mq_maxmsg);
  +     PRINT_ATTRS(ld, mq_msgsize);
  +     PRINT_ATTRS(ld, mq_curmsgs);
  +     switch (msq->sigev.sigev_notify) {
  +     default:
  +         break;
  +     case SIGEV_THREAD:
  +         PRINT_SIGEV(p, sigev_value.sival_ptr);
  +         PRINT_SIGEV(d, sigev_signo);
  +         PRINT_SIGEV(d, sigev_notify);
  +         PRINT_SIGEV(p, sigev_notify_function);
  +         PRINT_SIGEV(p, sigev_notify_attributes);
  +         break;
  +     }
  +     PRINT(p, head);
  +     PRINT(p, tail);
  +     PRINT(d, nqueued);
  +
  +#undef       PRINT_SIGEV
  +#undef       PRINT_ATTRS
   #undef       PRINT
        be--;
        if (colorize) be = stpcpy(be, ANSI_RESET);
  @@ -435,8 +512,53 @@
   
   static void rpmmsqInit(void *_msq)
   {
  +}
  +
  +static void rpmmsqFini(void *_msq)
  +{
       rpmmsq msq = (rpmmsq) _msq;
  -fprintf(stderr, "<== %s(%p)\n", __FUNCTION__, msq);
  +
  +    if (msq) {
  +SPEW("%s: queued %d sent %d recv %d timeout %d again %d\n", __FUNCTION__, 
msq->nqueued, msq->nsent, msq->nrecv, msq->ntimeout, msq->nagain);
  +     msq->msgmax = MSQ_MSGMAX;
  +     msq->msgsize = MSQ_MSGSIZE;
  +
  +     msq->nsent = 0;
  +     msq->nrecv = 0;
  +     msq->ntimeout = 0;
  +     msq->nagain = 0;
  +
  +     msq->flags = 0;
  +     msq->qname = _free(msq->qname);
  +     msq->fmode = _free(msq->fmode);
  +     msq->fdno = -1;
  +     msq->qid = -1;
  +     msq->oflags = 0;
  +     msq->omode = 0;
  +     msq->key = 0;
  +     msq->mtype = 0;
  +
  +     /* Drain the queue. */
  +     while (1) {
  +         rpmioItem item;
  +         RPM_GNUC_TM_ATOMIC {
  +             if ((item = msq->head) == NULL)
  +                 break;
  +             item = msq->head;
  +             msq->head = item->next;
  +             item->next = NULL;              /* XXX rpmmalloc.c ? */
  +             if (msq->head == NULL)
  +                 msq->tail = &msq->head;
  +         }
  +         while (item)
  +             item = rpmioFreePoolItem(item,
  +                     __FUNCTION__, __FILE__, __LINE__);
  +     }
  +     msq->nqueued = 0;
  +
  +     if (msq->zlog)
  +         msq->zlog = rpmzLogDump(msq->zlog, NULL);
  +    }
   }
   
   RPMIOPOOL_MODULE(msq)
  @@ -486,10 +608,6 @@
            oflags |= O_CLOEXEC;
            continue;
            break;
  -     case 'l':       /* XXX loopback mode */
  -         flags |= RPMMSQ_FLAGS_LOOP;
  -         continue;
  -         break;
        case 'n':       /* XXX O_NONBLOCK */
            oflags |= O_NONBLOCK;
            continue;
  @@ -548,12 +666,12 @@
       msq->msgmax = MSQ_MSGMAX;
       msq->msgsize = MSQ_MSGSIZE;
   
  -    /* Initialize the monitor. */
       /* XXX TODO: use POSIX shared mutexes. */
  -    Z(pthread_mutex_init(&msq->m, NULL));
  -    Z(pthread_cond_init(&msq->e, NULL));
  -    Z(pthread_cond_init(&msq->f, NULL));
  -    msq->inflight = 0;
  +    RPM_GNUC_TM_ATOMIC {
  +     msq->head = NULL;
  +     msq->tail = &msq->head;
  +    }
  +    msq->nqueued = 0;
       msq->nsent = 0;
       msq->nrecv = 0;
       msq->ntimeout = 0;
  @@ -572,13 +690,11 @@
       msq->qname = rpmGetPath("/", path, NULL);
   
       key_t key = 0;
  -    if (!strcmp(msq->qname, "/private")
  -     || !strcmp(msq->qname, "/IPC_PRIVATE")) {
  +    if (!strcmp(msq->qname, "/private")) {
        key = IPC_PRIVATE;
        msq->flags |= RPMMSQ_FLAGS_DELETE;      /* XXX delete on close. */
       } else
  -    if (!strcmp(msq->qname, "/program")
  -     || !strcmp(msq->qname, "/__progname_key")) {
  +    if (!strcmp(msq->qname, "/program")) {
        key = __progname_key;
       } else {
        const char * lpath = msq->qname;
  @@ -586,7 +702,7 @@
        if (!strcmp(lpath, "/rpm"))
             lpath = "/usr/lib/rpm";
        key = ftok(lpath, projid);
  -        if (key == -1) {
  +     if (key == -1) {
            int lvl = RPMLOG_WARNING;
            rpmlog(lvl, "%s: ftok(%s,0x%02x) failed: %m\n",
                __FUNCTION__, path, (projid & 0xff));
  @@ -627,26 +743,20 @@
       {
   #if defined(WITH_MQ)
        /* XXX put *open in a monitor. */
  -     struct mq_attr _attrs = {
  -         .mq_flags = (oflags & O_NONBLOCK),
  -         .mq_maxmsg = msq->msgmax,
  -         .mq_msgsize = msq->msgsize,
  -         .mq_curmsgs = 0,
  -     }, *attrs = &_attrs;
  -     msq->qid = Mq_open(msq->qname, msq->oflags, msq->omode, attrs);
  -
  -     if (msq->qid != -1 && MSQF_ISSET(LOOP)) {
  -         pthread_attr_t attr;
  -         Z(pthread_getattr_default_np(&attr));
  -         Z(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED));
  -         struct sigevent sigev = {
  -             .sigev_notify = SIGEV_THREAD,
  -             .sigev_notify_function = rpmmsqReader,
  -             .sigev_notify_attributes = &attr,
  -             .sigev_value.sival_ptr = msq,
  -         };
  -         Z(rpmmsqNotify(msq, &sigev));
  -     }
  +     msq->oattrs.mq_flags = (oflags & O_NONBLOCK);
  +     msq->oattrs.mq_maxmsg = msq->msgmax;
  +     msq->oattrs.mq_msgsize = msq->msgsize;
  +     msq->oattrs.mq_curmsgs = 0;
  +     msq->qid = Mq_open(msq->qname, msq->oflags, msq->omode, &msq->oattrs);
  +
  +     Z(pthread_getattr_default_np(&msq->attr));
  +     Z(pthread_attr_setdetachstate(&msq->attr, PTHREAD_CREATE_DETACHED));
  +     msq->sigev.sigev_notify = SIGEV_THREAD;
  +     msq->sigev.sigev_notify_function = rpmmsqReader;
  +     msq->sigev.sigev_notify_attributes = &msq->attr;
  +     msq->sigev.sigev_value.sival_ptr = msq;
  +     if (msq->qid != -1)
  +         Z(rpmmsqNotify(msq, &msq->sigev));
   #endif       /* WITH_MQ */
       }        break;
       case RPMMSQ_TYPE_SYSV:
  @@ -675,20 +785,29 @@
       case RPMMSQ_TYPE_POSIX:
       {
   #if defined(WITH_MQ)
  -     if (MSQF_ISSET(LOOP)) return 0; /* XXX EOF with loopback service. */
  -
        unsigned int prio = 0;
   
  -     /* Consumer in monitor. */
  -     LOCK(msq->m);
  -     while (msq->inflight == 0)
  -         WAIT(msq->e, msq->m);
  -     rc = Mq_receive(msq->qid, buf, count, &prio);
  +     rpmaio aio = rpmaioNew(msq->qid, LIO_READ, 0, buf, count);
  +     aio->ix = ++msq->nqueued;
  +
  +     rpmioItem item = rpmioLinkPoolItem(&aio->_item,
  +                     __FUNCTION__, __FILE__, __LINE__);
  +
  +     /* Queue the read. */
  +     POSSESS(item->use);
  +     RPM_GNUC_TM_ATOMIC {
  +         *msq->tail = item;
  +         msq->tail = (rpmioItem *) &item->next;
  +     }
  +     WAITFOR(item->use, TO_BE, 1);
  +     RELEASE(item->use);
  +
  +     rc = aio->cb.__return_value;            /* XXX aio_return */
        if (rc >= 0)
  -         msq->nrecv++;
  -     if (msq->inflight-- == msq->msgmax)
  -         SIGNAL(msq->f);
  -     UNLOCK(msq->m);
  +         prio = aio->cb.aio_reqprio;         /* XXX prio */
  +     else
  +         errno = aio_error(&aio->cb);
  +     aio = rpmaioFree(aio);
   
        if (rc >= 0) {
            if (priop)
  @@ -719,7 +838,7 @@
        break;
       }
   
  -SPEW("<== %s(%p,%p[%lu],%p) qid %d rc %ld *priop %lu\n", __FUNCTION__, msq, 
buf, count, priop, (msq ? msq->qid : -1), rc, (priop ? *priop : 0));
  +SPEW("<==\t%s(%p,%p[%lu],%p) qid %d rc %ld *priop %lu\n", __FUNCTION__, msq, 
buf, count, priop, (msq ? msq->qid : -1), rc, (priop ? *priop : 0));
       return rc;
   }
   
  @@ -734,24 +853,38 @@
       {
   #if defined(WITH_MQ)
   
  -     /* Producer in monitor. */
  -     LOCK(msq->m);
  -     while (msq->inflight == msq->msgmax)
  -         WAIT(msq->f, msq->m);
  -     rc = Mq_send(msq->qid, buf, count, prio);
  -     if (msq->inflight++ == 0)
  -         SIGNAL(msq->e);
  -     UNLOCK(msq->m);
  +     if (msq->nsent == 0) {
  +         rc = Mq_send(msq->qid, buf, count, prio);
  +         if (rc == 0)
  +             msq->nsent++;
   
  -     /* (loopback mode) Wait for the reader thread. */
  -     if (msq->nsent == 0 && MSQF_ISSET(LOOP)) {
            /* Yield the CPU. */
  +         sched_yield();
            struct timespec ts = { 0, 100*1000 };
            Z(nanosleep(&ts, NULL));
  -     }
  +         /* Wait for the reader thread. */
  +     } else {
  +         rpmaio aio = rpmaioNew(msq->qid, LIO_WRITE, prio, (char *)buf, 
count);
  +         aio->ix = ++msq->nqueued;
  +         aio->cb.aio_reqprio = prio; /* XXX prio */
  +
  +         rpmioItem item = rpmioLinkPoolItem(&aio->_item,
  +                     __FUNCTION__, __FILE__, __LINE__);
  +
  +         /* Queue the write. */
  +         POSSESS(item->use);
  +         RPM_GNUC_TM_ATOMIC {
  +             *msq->tail = item;
  +             msq->tail = (rpmioItem *) &item->next;
  +         }
  +         WAITFOR(item->use, TO_BE, 1);
  +         RELEASE(item->use);
   
  -     if (rc == 0)
  -         msq->nsent++;
  +         rc = aio->cb.__return_value;        /* XXX aio_return */
  +         if (rc < 0)
  +             errno = aio_error(&aio->cb);
  +         aio = rpmaioFree(aio);
  +     }
   
        if (rc == 0)    /* XXX remap to write(2) return */
            rc = count;
  @@ -781,7 +914,7 @@
        break;
       }
   
  -SPEW("<== %s(%p,%p[%lu],%lu) qid %d rc %ld\n", __FUNCTION__, msq, buf, 
count, prio, (msq ? msq->qid : -1), rc);
  +SPEW("<==\t%s(%p,%p[%lu],%lu) qid %d rc %ld\n", __FUNCTION__, msq, buf, 
count, prio, (msq ? msq->qid : -1), rc);
       return rc;
   }
   
  @@ -796,7 +929,7 @@
       off_t p = pos;
   #endif
       int rc = -2;     /* assume failure */
  -SPEW("<== %s(%p, %ld, SEEK_%s) qid %d rc %d\n", __FUNCTION__, msq, p, 
SEEK_[whence&0x3], (msq ? msq->qid : -1), rc);
  +SPEW("<==\t%s(%p, %ld, SEEK_%s) qid %d rc %d\n", __FUNCTION__, msq, p, 
SEEK_[whence&0x3], (msq ? msq->qid : -1), rc);
       return rc;
   }
   
  @@ -804,7 +937,7 @@
   {
       int rc = -2;     /* assume failure */
   
  -SPEW("==> %s(%p,%d) qid %d\n", __FUNCTION__, msq, delete, (msq ? msq->qid : 
-1));
  +SPEW("==>\t%s(%p,%d) qid %d\n", __FUNCTION__, msq, delete, (msq ? msq->qid : 
-1));
       if (msq)
       switch (msq->flags & RPMMSQ_TYPE_MASK) {
       case RPMMSQ_TYPE_DEFAULT:
  @@ -812,21 +945,64 @@
       {
   #if defined(WITH_MQ)
        if (msq->qid != -1) {
  +
            if (MSQF_ISSET(INFO) || _rpmmsq_debug)
                rpmmsqDump(NULL, msq, NULL);
   
  -         /* Producer in monitor. */
  -         LOCK(msq->m);
  -         while (msq->inflight == msq->msgmax)
  -             WAIT(msq->f, msq->m);
  +#ifdef       DYING
  +         /* Wait for the queue to drain. */
  +         while (1) {
  +             rpmioItem item;
  +             RPM_GNUC_TM_ATOMIC {
  +                 if ((item = msq->head) == NULL)
  +                     break;
  +             }
  +             POSSESS(item->use);
  +             WAITFOR(item->use, TO_BE, 0);
  +             RELEASE(item->use);
  +         }
  +
            /* Turn off the sigev detached thread. */
  -         if (MSQF_ISSET(LOOP))
  -             rc = rpmmsqNotify(msq, NULL);
  -         rc = Mq_close(msq->qid);
  +         int qid = msq->qid;
            msq->qid = -1;
  -         if (msq->inflight++ == 0)
  -             SIGNAL(msq->e);
  -         UNLOCK(msq->m);
  +         rc = rpmmsqNotify(msq, NULL);
  +
  +         /* Yield the CPU. */
  +         sched_yield();
  +         struct timespec ts = { 0, 100*1000 };
  +         Z(nanosleep(&ts, NULL));
  +         /* Wait for the reader thread. */
  +
  +#else
  +         if (msq->nsent == 0)  {
  +             rc = rpmmsqNotify(msq, NULL);
  +             rc = Mq_close(msq->qid);
  +             if (rc == 0)
  +                 msq->qid = -1;
  +             break;
  +         }
  +         unsigned prio = 0;
  +         rpmaio aio = rpmaioNew(msq->qid, LIO_CLOSE, prio, NULL, 0);
  +         aio->ix = ++msq->nqueued;
  +         aio->cb.aio_reqprio = prio; /* XXX prio */
  +
  +         rpmioItem item = rpmioLinkPoolItem(&aio->_item,
  +                     __FUNCTION__, __FILE__, __LINE__);
  +
  +         /* Queue the write. */
  +         POSSESS(item->use);
  +         RPM_GNUC_TM_ATOMIC {
  +             *msq->tail = item;
  +             msq->tail = (rpmioItem *) &item->next;
  +         }
  +         WAITFOR(item->use, TO_BE, 1);
  +         RELEASE(item->use);
  +
  +         rc = aio->cb.__return_value;        /* XXX aio_return */
  +         if (rc < 0)
  +             errno = aio_error(&aio->cb);
  +         aio = rpmaioFree(aio);
  +#endif
   
        }
   
  @@ -850,7 +1026,7 @@
       default:
        break;
       }
  -SPEW("<== %s(%p,%d) qid %d rc %d\n", __FUNCTION__, msq, delete, (msq ? 
msq->qid : -1), rc);
  +SPEW("<==\t%s(%p,%d) qid %d rc %d\n", __FUNCTION__, msq, delete, (msq ? 
msq->qid : -1), rc);
       return rc;
   }
   
  @@ -969,6 +1145,33 @@
       return rc;
   }
   
  +static ssize_t rpmmsqReaderRead(rpmmsq msq, char *b, size_t nb, unsigned 
*priop)
  +{
  +    ssize_t rc;
  +
  +    errno = 0;
  +    do {
  +     struct timespec ts = { 0, 1000 };
  +         
  +     rc = Mq_receive(msq->qid, b, nb, priop);
  +     if (rc < 0) {
  +         switch (errno) {
  +         case ETIMEDOUT:     /* XXX Mq_timedreceive() */
  +             msq->ntimeout++;
  +             Z(nanosleep(&ts, NULL));
  +             continue;
  +         case EAGAIN:        /* XXX O_NONBLOCK */
  +             msq->nagain++;
  +             Z(nanosleep(&ts, NULL));
  +             continue;
  +         }
  +     }
  +     break;
  +    } while (rc < 0);
  +
  +    return rc;
  +}
  +
   void rpmmsqReader(union sigval sv)
   {
   #if defined(WITH_MQ)
  @@ -976,50 +1179,66 @@
   assert(msq = rpmmsqLink(msq));
   
   SPEW("==> %s(%p) qid %d\n", __FUNCTION__, msq, msq->qid);
  -    char b[BUFSIZ];
  -    size_t nb = sizeof(b);
  -    int rc;
  +    ssize_t rc;
   
       while (msq->qid != -1) {
  -     unsigned int prio = 0;
  -     int rc;
   
  -     /* Consumer in monitor. */
  -     LOCK(msq->m);
  -     while (msq->inflight == 0)
  -         WAIT(msq->e, msq->m);
  -     errno = 0;
  -     do {
  -         struct timespec ts = { 0, 1000 };
  -         rc = Mq_receive(msq->qid, b, nb, &prio);
  -         if (rc < 0) {
  -             switch (errno) {
  -             case ETIMEDOUT: /* XXX Mq_timedreceive() */
  -                 msq->ntimeout++;
  -                 Z(nanosleep(&ts, NULL));
  -                 continue;
  -             case EAGAIN:    /* XXX O_NONBLOCK */
  -                 msq->nagain++;
  -                 Z(nanosleep(&ts, NULL));
  -                 continue;
  +     while (1) {
  +         rpmioItem item;
  +
  +         /* Get next request from queue. */
  +         RPM_GNUC_TM_ATOMIC {
  +             if ((item = msq->head) == NULL)
  +                 break;
  +             msq->head = item->next;
  +             item->next = NULL;              /* XXX rpmmalloc.c ? */
  +             if (msq->head == NULL)
  +                 msq->tail = &msq->head;
  +         }
  +
  +         rpmaio aio = (rpmaio) item;
  +         char *b = (char *)aio->cb.aio_buf;
  +         size_t nb = aio->cb.aio_nbytes;
  +
  +         errno = 0;
  +         switch (aio->cb.aio_lio_opcode) {
  +         case LIO_READ:
  +         {   unsigned int prio = 0;
  +             rc = rpmmsqReaderRead(msq, b, nb, &prio);
  +             if (rc >= 0) {
  +                 msq->nrecv++;
  +                 aio->cb.aio_reqprio = prio;         /* XXX prio */
  +             }
  +         }   break;
  +         case LIO_WRITE:
  +         {   unsigned int prio = aio->cb.aio_reqprio;        /* XXX prio */
  +             rc = Mq_send(msq->qid, (const char *)b, nb, prio);
  +             if (rc == 0)
  +                 msq->nsent++;
  +         }   break;
  +         case LIO_CLOSE:
  +         {
  +             if (b == NULL && nb == 0) {
  +                 rc = Mq_close(msq->qid);
  +                 if (rc == 0)
  +                     msq->qid = -1;
                }
  +         }   break;
  +         case LIO_DSYNC:
  +         case LIO_SYNC:
  +         case LIO_READ64:
  +         case LIO_WRITE64:
  +         default:
  +             break;
            }
  -         if (rc >= 0)
  -             msq->nrecv++;
  -         break;
  -     } while (rc < 0);
  -     if (msq->inflight-- == msq->msgmax)
  -         SIGNAL(msq->f);
  -     UNLOCK(msq->m);
   
  -     if (rc >= 0) {
  -         /* Deliver the mssage through a callback. */
  -         int xx = rpmmsqDeliver(msq, b, rc, prio);
  -         (void)xx;
  +         aio->cb.__error_code = errno;               /* XXX */
  +         aio->cb.__return_value = rc;                /* XXX */
  +         aio = rpmaioFree(aio);
        }
       }
   
  -SPEW("<== %s(%p) qid %d rc %d\n", __FUNCTION__, msq, msq->qid, rc);
  +SPEW("<== %s(%p) qid %d rc %ld\n", __FUNCTION__, msq, msq->qid, rc);
   
       msq = rpmmsqFree(msq);
   
  @@ -1029,24 +1248,36 @@
   static void rpmmsqDumpST(const char * msg, void * _ptr, FILE * fp)
   {
       struct stat *st = (struct stat *) _ptr;
  +    char b[BUFSIZ];
  +    char * be = b;
  +    int colorize = isatty(fileno(stderr));
   
       if (fp == NULL)  fp = stderr;
  -    if (msg)         fprintf(fp, "================ %s\n", msg);
  +#define ANSI_BRIGHT_BLUE     "\x1b[34;1m"
  +#define ANSI_RESET           "\x1b[0m"
  +    if (colorize)    be = stpcpy(be, ANSI_BRIGHT_BLUE);
  +    if (msg) be += sprintf(be, "========================== %s\n", msg);
   
       if (st) {
  -     fprintf(fp, "\t    dev: 0x%lx\n", (unsigned long)st->st_dev);
  -     fprintf(fp, "\t    ino: %lu\n", (unsigned long)st->st_ino);
  -     fprintf(fp, "\t   mode: 0%o\n", st->st_mode);
  -     fprintf(fp, "\t  nlink: %lu\n", (unsigned long)st->st_nlink);
  -     fprintf(fp, "\t    uid: %d\n", st->st_uid);
  -     fprintf(fp, "\t    gid: %d\n", st->st_gid);
  -     fprintf(fp, "\t   rdev: 0x%lx\n", (unsigned long)st->st_rdev);
  -     fprintf(fp, "\t   size: %lu\n", (unsigned long)st->st_size);
  -     fprintf(fp, "\tblksize: %lu\n", (unsigned long)st->st_blksize);
  -     fprintf(fp, "\t blocks: %lu\n", (unsigned long)st->st_blocks);
  -     fprintf(fp, "\t  atime: %lu\n", (unsigned long)st->st_atime);
  -     fprintf(fp, "\t  mtime: %lu\n", (unsigned long)st->st_mtime);
  -     fprintf(fp, "\t  ctime: %lu\n", (unsigned long)st->st_mtime);
  +#define PRINT_STAT(_fmt, _foo) \
  +    {        be += sprintf(be, "%25s: %"#_fmt"\n", #_foo, st->st_##_foo); }
  +     PRINT_STAT(lx, dev);
  +     PRINT_STAT(lu, ino);
  +     PRINT_STAT( o, mode);
  +     PRINT_STAT(lu, nlink);
  +     PRINT_STAT( d, uid);
  +     PRINT_STAT( d, gid);
  +     PRINT_STAT(lx, rdev);
  +     PRINT_STAT(zu, size);
  +     PRINT_STAT(zu, blksize);
  +     PRINT_STAT(zu, blocks);
  +     PRINT_STAT(lu, atime);
  +     PRINT_STAT(lu, mtime);
  +     PRINT_STAT(lu, ctime);
  +#undef       PRINT_STAT
  +     *(--be) = '\0';
  +     if (colorize) be = stpcpy(be, ANSI_RESET);
  +     fprintf(fp, "%s\n", b);
       }
   }
   
  @@ -1123,11 +1354,10 @@
            break;
   
        const char * lpath = rpmGetPath("/dev/mqueue/", msq->qname, NULL);
  -     struct stat sb;
   
  -     rc = Stat(lpath, &sb);
  +     rc = Stat(lpath, &msq->sb);
        if (!rc)
  -         rpmmsqDumpST(lpath, &sb, NULL);
  +         rpmmsqDumpST(lpath, &msq->sb, NULL);
   
        /* XXX .fdio avoids select/poll issues on /dev/mqueue with .ufdio. */
        {   FD_t fd = Fopen(lpath, "r.fdio");
  @@ -1149,16 +1379,14 @@
       case RPMMSQ_TYPE_SYSV:
       {
   #if defined(WITH_MSQ)
  -     struct msqid_ds ds;
  -     rc = rpmmsqCtl(msq, IPC_STAT, &ds);
  +     rc = rpmmsqCtl(msq, IPC_STAT, &msq->ds);
        if (!rc)
  -         rpmmsqDumpDS(NULL, &ds, NULL);
  +         rpmmsqDumpDS(NULL, &msq->ds, NULL);
   
   #if defined(IPC_INFO)        /* XXX linux-only? */
  -     struct msginfo mi;
  -     rc = rpmmsqCtl(msq, IPC_INFO, (struct msqid_ds *)&mi);
  +     rc = rpmmsqCtl(msq, IPC_INFO, (struct msqid_ds *)&msq->mi);
        if (!rc)
  -         rpmmsqDumpMI(NULL, &mi, NULL);
  +         rpmmsqDumpMI(NULL, &msq->mi, NULL);
   #endif
   #endif       /* WITH_MSQ */
       }        break;
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/poptIO.c
  ============================================================================
  $ cvs diff -u -r1.94.2.30 -r1.94.2.31 poptIO.c
  --- rpm/rpmio/poptIO.c        25 May 2017 19:44:41 -0000      1.94.2.30
  +++ rpm/rpmio/poptIO.c        28 May 2017 18:01:52 -0000      1.94.2.31
  @@ -865,18 +865,20 @@
    { "rpmiobdebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, 
&_rpmiob_debug, -1,
        N_("Debug rpmio I/O buffers"), NULL},
   
  + { "rpmaiodebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, 
&_rpmaio_debug, -1,
  +     N_("Debug AIO wrapper"), NULL},
    { "rpmasndebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, 
&_rpmasn_debug, -1,
        N_("Debug embedded ASN.1 interpreter"), NULL},
    { "rpmbagdebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, 
&_rpmbag_debug, -1,
        N_("Debug depsolver wrappers "), NULL},
    { "rpmcvsdebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, 
&_rpmcvs_debug, -1,
  -     N_("Debug CVS wrappers "), NULL},
  +     N_("Debug CVS wrappers"), NULL},
    { "rpmgitdebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, 
&_rpmgit_debug, -1,
  -     N_("Debug GIT wrappers "), NULL},
  +     N_("Debug GIT wrappers"), NULL},
    { "rpmsetdebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, 
&_rpmset_debug, -1,
  -     N_("Debug SET-VERSION wrappers "), NULL},
  +     N_("Debug SET-VERSION wrappers"), NULL},
    { "rpmsvndebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, 
&_rpmsvn_debug, -1,
  -     N_("Debug Subversion wrappers "), NULL},
  +     N_("Debug Subversion wrappers"), NULL},
    { "rpmtpmdebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, 
&_rpmtpm_debug, -1,
        N_("Debug TPM emulator"), NULL},
   
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmio.c
  ============================================================================
  $ cvs diff -u -r1.230.2.52 -r1.230.2.53 rpmio.c
  --- rpm/rpmio/rpmio.c 27 May 2017 14:22:18 -0000      1.230.2.52
  +++ rpm/rpmio/rpmio.c 28 May 2017 18:01:53 -0000      1.230.2.53
  @@ -2659,7 +2659,6 @@
    * - bzopen: 'q' sets verbosity to 0
    * - bzopen: 'v' does verbosity++ (up to 4)
    * - HACK:   '.' terminates, rest is type of I/O
  - * - HACK:   'l' loopback mode (msqio)
    * - HACK:   'n' non-blocking (O_NONBLOCK)
    * - HACK:   't' truncate (O_TRUNC)
    * - HACK:   'D' sync (O_DIRECT)
  @@ -2732,8 +2731,6 @@
            if (--nstdio > 0) *stdio++ = c;
            continue;
            break;
  -     case 'l':               /* XXX loopback mode (rpmmsqNew) */
  -         goto other;
        case 'n':
            flags |= O_NONBLOCK;
            goto other;
  @@ -3779,6 +3776,7 @@
       RPMIOPOOL_FREE(sp)
       RPMIOPOOL_INTERP_FREE(sx)
   
  +    RPMIOPOOL_FREE(aio)
       RPMIOPOOL_FREE(msq)
   
       RPMIOPOOL_FREE(html)
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmalloc.c
  ============================================================================
  $ cvs diff -u -r1.29.2.7 -r1.29.2.8 rpmmalloc.c
  --- rpm/rpmio/rpmmalloc.c     27 May 2017 14:22:18 -0000      1.29.2.7
  +++ rpm/rpmio/rpmmalloc.c     28 May 2017 18:01:53 -0000      1.29.2.8
  @@ -115,7 +115,7 @@
       pool->size = size;
       pool->limit = limit;
       pool->flags = flags;
  -    pool->dbug = (char * (*)(void *item, char *b, size_t nb)) dbug;
  +    pool->dbug = dbug;
       pool->init = init;
       pool->fini = fini;
       pool->reused = 0;
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmsq.h
  ============================================================================
  $ cvs diff -u -r1.1.2.11 -r1.1.2.12 rpmmsq.h
  --- rpm/rpmio/rpmmsq.h        26 May 2017 19:53:49 -0000      1.1.2.11
  +++ rpm/rpmio/rpmmsq.h        28 May 2017 18:01:53 -0000      1.1.2.12
  @@ -26,20 +26,68 @@
       RPMMSQ_FLAGS_SYSV                = RPMMSQ_TYPE_SYSV,
        /* bits 2-7 reserved */
       RPMMSQ_FLAGS_INFO                = _MFB( 8),
  -    RPMMSQ_FLAGS_LOOP                = _MFB( 9),
  -    RPMMSQ_FLAGS_RESET               = _MFB(10),
  -    RPMMSQ_FLAGS_DELETE              = _MFB(11),
  -    RPMMSQ_FLAGS_DEBUG               = _MFB(12),
  -     /* bits 13-31 unused */
  +    RPMMSQ_FLAGS_RESET               = _MFB( 9),
  +    RPMMSQ_FLAGS_DELETE              = _MFB(10),
  +    RPMMSQ_FLAGS_DEBUG               = _MFB(11),
  +     /* bits 14-31 unused */
   } rpmmsqFlags;
   #undef  _MFB
   #undef  _KFB
   
   /**
    */
  +#include <aio.h>
  +
  +enum {
  +    LIO_DSYNC        = LIO_NOP+1,
  +    LIO_SYNC,
  +    LIO_CLOSE,
  +    LIO_READ64       = LIO_READ | 128,
  +    LIO_WRITE64      = LIO_WRITE | 128,
  +};
  +
  +extern int _rpmaio_debug;
  +
  +typedef struct rpmaio_s * rpmaio;
  +
  +struct rpmaio_s {
  +    struct rpmioItem_s _item;        /*!< usage mutex and pool identifier. */
  +    struct aiocb cb;
  +    int ix;
  +};
  +
  +/**
  + * Unreference a aio wrapper instance.
  + * @param aio                aio wrapper
  + * @return           NULL on last dereference
  + */
  +rpmaio rpmaioUnlink (rpmaio aio);
  +#define rpmaioUnlink(_aio)   \
  +    ((rpmaio)rpmioUnlinkPoolItem((rpmioItem)(_aio), __FUNCTION__, __FILE__, 
__LINE__))
  +
  +/**
  + * Reference a aio wrapper instance.
  + * @param aio                aio wrapper
  + * @return           new aio wrapper reference
  + */
  +rpmaio rpmaioLink (rpmaio aio);
  +#define rpmaioLink(_aio)     \
  +    ((rpmaio)rpmioLinkPoolItem((rpmioItem)(_aio), __FUNCTION__, __FILE__, 
__LINE__))
  +
  +/**
  + * Destroy a aio wrapper.
  + * @param aio                aio wrapper
  + * @return           NULL on last dereference
  + */
  +rpmaio rpmaioFree(rpmaio aio);
  +#define rpmaioFree(_aio)     \
  +    ((rpmaio)rpmioFreePoolItem((rpmioItem)(_aio), __FUNCTION__, __FILE__, 
__LINE__))
  +
   #if defined(_RPMMSQ_INTERNAL)
   struct rpmmsq_s {
       struct rpmioItem_s _item;        /*!< usage mutex and pool identifier. */
  +    rpmioItem head;          /*!< message queue head. */
  +    rpmioItem * tail;                /*!< message queue tail. */
       const char * qname;              /*!> message queue path. */
       const char * fmode;
       int fdno;
  @@ -52,17 +100,25 @@
       long mtype;                      /*!< SysV: message type. */
   
       int msgmax;                      /*!< max. inflight messages. */
  -    int msgsize;                     /*!< max. message size. */
  +    int msgsize;             /*!< max. message size. */
   
  -    pthread_mutex_t m;               /*!< monitor mutex. */
  -    pthread_cond_t  e;               /*!< empty condition. */
  -    pthread_cond_t  f;               /*!< full condition. */
  -    int inflight;            /*!< no. of inflight messages. */
  +    int nqueued;             /*!< no. messages queued. */
       int nsent;                       /*!< no. messages sent. */
       int nrecv;                       /*!< no. messages received. */
       int ntimeout;            /*!< no. of receive timeouts. */
       int nagain;                      /*!< no. of waits (O_NONBLOCK). */
   
  +    struct mq_attr oattrs;   /*!< mq_open attributes. */
  +    pthread_attr_t attr;     /*!< mq_notify thread attr */
  +    struct sigevent sigev;   /*!< mq_notify sigevent_t */
  +
  +    struct stat sb;
  +#if defined(WITH_MSQ)
  +    struct msqid_ds ds;
  +    struct msginfo mi;
  +#endif       /* WITH_MSQ */
  +
  +
       rpmzLog zlog;
   };
   #endif       /* _RPMMSQ_INTERNAL */
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/tmq.c
  ============================================================================
  $ cvs diff -u -r1.1.2.10 -r1.1.2.11 tmq.c
  --- rpm/rpmio/tmq.c   26 May 2017 13:22:18 -0000      1.1.2.10
  +++ rpm/rpmio/tmq.c   28 May 2017 18:01:53 -0000      1.1.2.11
  @@ -62,7 +62,6 @@
        RPMMSQ_FLAGS_DEBUG |
        RPMMSQ_FLAGS_DELETE |
        RPMMSQ_FLAGS_RESET |
  -     RPMMSQ_FLAGS_LOOP |
        RPMMSQ_FLAGS_INFO |
        RPMMSQ_FLAGS_SYSV |
        RPMMSQ_FLAGS_POSIX |
  @@ -337,14 +336,16 @@
   static int doMSQ(ARGV_t av, int ac)
   {
       int rc = -2;     /* assume failure */
  -    char b[BUFSIZ];
  -    size_t nb = sizeof(b);
  +    char wb[BUFSIZ];
  +    size_t nwb = sizeof(wb);
  +    size_t nw;
  +    char rb[BUFSIZ];
  +    size_t nrb = sizeof(rb);
  +    size_t nr;
       size_t blen;
       int xx;
   
  -    (void)b;
  -    (void)nb;
  -    (void)blen;
  +    (void)nrb;
   
   #define F_ISSET(_f, _FLAG) ((_f) & RPMMSQ_FLAGS_##_FLAG)
   fprintf(stderr, "*** %s: flags 0x%x\n", __FUNCTION__, flags);
  @@ -360,7 +361,6 @@
       if (F_ISSET(flags, RESET))       *te++ = 'R';
       if (F_ISSET(flags, INFO))        *te++ = 'I';
       if (F_ISSET(flags, DELETE))      *te++ = 'D';
  -    if (F_ISSET(flags, LOOP))        *te++ = 'l';
       if (oflags & O_CLOEXEC)  *te++ = 'e';
       if (oflags & O_NONBLOCK) *te++ = 'n';
       if (oflags & O_TRUNC)    *te++ = 't';
  @@ -378,28 +378,35 @@
   #ifdef       DISABLE
       rpmmsq msq = rpmmsqNew(qname, fmode, -1, flags);
   
  -    memset(b, 0, nb);
  +    memset(wb, 0, nwb);
       if (msq) {
  -
  -     strcpy(b, "yadda yadda");
  -     blen = strlen(b);
  -     xx = rpmmsqSend(msq, b, blen, priority);
  -
  -     strcpy(b, "bing bang boom");
  -     blen = strlen(b);
  -     xx = rpmmsqSend(msq, b, blen, priority);
  -
  -     memset(b, 0, nb);
  -     xx = rpmmsqRecv(msq, b, nb, NULL);
  -     memset(b, 0, nb);
  -     xx = rpmmsqRecv(msq, b, nb, NULL);
  +     const char msg1[] = "yadda yadda";
  +     size_t nmsg1 = sizeof(msg1) - 1;
  +     const char msg2[] = "bing bang boom";
  +     size_t nmsg2 = sizeof(msg2) - 1;
  +
  +     strcpy(wb, msg1);
  +     blen = strlen(wb);
  +     nw = rpmmsqSend(msq, wb, blen, priority);
  +     assert(nw == blen);
  +
  +     strcpy(wb, msg2);
  +     blen = strlen(wb);
  +     nw = rpmmsqSend(msq, wb, blen, priority);
  +     assert(nw == blen);
  +
  +     memset(rb, 0, nrb);
  +     nr = rpmmsqRecv(msq, rb, nrb, NULL);
  +     assert(nr == nmsg1);
  +     memset(rb, 0, nrb);
  +     nr = rpmmsqRecv(msq, rb, nrb, NULL);
  +     assert(nr == nmsg2);
   
        for (int i = 0; i < count; i++) {
  -         snprintf(b, nb, "%d", i);
  -         blen = strlen(b);
  -         xx = rpmmsqSend(msq, b, blen, priority);
  -         memset(b, 0, nb);
  -         xx = rpmmsqRecv(msq, b, nb, NULL);
  +         blen = snprintf(wb, nwb, "%d", i);
  +         nw = rpmmsqSend(msq, wb, blen, priority);
  +         memset(rb, 0, blen);
  +         nr = rpmmsqRecv(msq, rb, nrb, NULL);
        }
       }
   
  @@ -410,37 +417,48 @@
       FD_t fd = Fopen(qname, fmode);
       if (fd) {
   
  -#ifndef      DISABLE
  +#ifdef       DISABLE
        xx = Ftell(fd);
  -     strcpy(b, "foo bar baz");
  -     blen = strlen(b);
  -     xx = Fwrite(b, 1, blen, fd);
  +#endif       /* DISABLE */
  +
  +     strcpy(wb, "foo bar baz");
  +     blen = strlen(wb);
  +     nw = Fwrite(wb, 1, blen, fd);
  +
  +#ifdef       DISABLE
        xx = Ferror(fd);
        xx = Fflush(fd);
        xx = Ftell(fd);
  +#endif       /* DISABLE */
  +
  +     memset(rb, 0, blen);
  +     nr = Fread(rb, 1, blen, fd);
  +     assert(nr == nw && !strncmp(rb, wb, blen));
   
  -     memset(b, 0, blen);
  -     xx = Fread(b, 1, blen, fd);
  +#ifdef       DISABLE
        xx = Ftell(fd);
   
        xx = Fileno(fd);
   if (_rpmio_debug)
   fprintf(stderr, "<== Fileno(%p) rc %d\n", fd, xx);
  +#endif       /* DISABLE */
   
  -     strcpy(b, "blah blah blah");
  -     blen = strlen(b);
  -     xx = Fwrite(b, 1, blen, fd);
  -     memset(b, 0, blen);
  -     xx = Fread(b, 1, blen, fd);
  +     strcpy(wb, "blah blah blah");
  +     blen = strlen(wb);
  +     nw = Fwrite(wb, 1, blen, fd);
  +     memset(rb, 0, blen);
  +     nr = Fread(rb, 1, blen, fd);
  +     assert(nr == nw && !strncmp(rb, wb, blen));
   
        for (int i = 0; i < count; i++) {
  -         snprintf(b, nb, "%d", i);
  -         blen = strlen(b);
  -         xx = Fwrite(b, 1, blen, fd);
  -         memset(b, 0, blen);
  -         xx = Fread(b, 1, blen, fd);
  +         blen = snprintf(wb, nwb, "%d", i);
  +         nw = Fwrite(wb, 1, blen, fd);
  +         memset(rb, 0, blen);
  +         nr = Fread(rb, 1, blen, fd);
  +         assert(nr == nw && !strncmp(rb, wb, blen));
        }
   
  +#ifdef       DISABLE
   #ifdef       NOTYET
        xx = Feof(fd);
        xx = Clearerr(fd);
  @@ -450,14 +468,13 @@
        xx = Fgetpos(fd, &pos);
        xx = Fsetpos(fd, &pos);
   
  -#else
        FD_t nfd = Fdopen(fd, fmode);
        if (nfd) {
  -         strcpy(b, "foo bar baz");
  -         blen = strlen(b);
  -         xx = Fwrite(b, 1, blen, nfd);
  -         memset(b, 0, blen);
  -         xx = Fread(b, 1, blen, nfd);
  +         strcpy(wb, "foo bar baz");
  +         blen = strlen(wb);
  +         nw = Fwrite(wb, 1, blen, nfd);
  +         memset(rb, 0, blen);
  +         nr = Fread(rb, 1, blen, nfd);
            xx = Fclose(nfd);
        }
   #endif       /* DISABLE */
  @@ -494,9 +511,6 @@
        N_("Display queue info on close"), NULL },
    { "delete", 'D', POPT_BIT_SET|POPT_ARGFLAG_TOGGLE, &flags, _MSQBIT(DELETE),
        N_("Remove queue after closing"), NULL },
  -     /* XXX -L no workie */
  - { "loopback", 'L', POPT_BIT_SET|POPT_ARGFLAG_TOGGLE,&flags, _MSQBIT(LOOP),
  -     N_("Read messages after sending"), NULL },
   
    { "count", 'c', POPT_ARG_INT,                           &count, 0,
        N_("Send COUNT messages"), N_("COUNT") },
  @@ .
______________________________________________________________________
RPM Package Manager                                    http://rpm5.org
CVS Sources Repository                                [email protected]

Reply via email to