RPM Package Manager, CVS Repository
  http://rpm5.org/cvs/
  ____________________________________________________________________________

  Server: rpm5.org                         Name:   Jeff Johnson
  Root:   /v/rpm/cvs                       Email:  j...@rpm5.org
  Module: rpm                              Date:   29-May-2017 00:14:55
  Branch: rpm-5_4                          Handle: 2017052822145500

  Modified files:           (Branch: rpm-5_4)
    rpm/rpmio               msqio.c rpmmsq.h

  Log:
    - msqio: refactor errocide retrieval into aioqPutWait.

  Summary:
    Revision    Changes     Path
    1.1.2.18    +71 -40     rpm/rpmio/msqio.c
    1.1.2.15    +2  -0      rpm/rpmio/rpmmsq.h
  ____________________________________________________________________________

  patch -p0 <<'@@ .'
  Index: rpm/rpmio/msqio.c
  ============================================================================
  $ cvs diff -u -r1.1.2.17 -r1.1.2.18 msqio.c
  --- rpm/rpmio/msqio.c 28 May 2017 20:18:16 -0000      1.1.2.17
  +++ rpm/rpmio/msqio.c 28 May 2017 22:14:55 -0000      1.1.2.18
  @@ -72,7 +72,7 @@
   {
       rpmmsq msq = NULL;
       mqd_t mqdes = mq_open(name, oflag, mode, attr);
  -SPEW("<--\t%s(%s,0x%x,0%o,%p) qid %d\n", __FUNCTION__, name, oflag, mode, 
attr, mqdes);
  +SPEW("<--\t    %s(%s,0x%x,0%o,%p) qid %d\n", __FUNCTION__, name, oflag, 
mode, attr, mqdes);
       return mqdes;
   }
   
  @@ -81,7 +81,7 @@
   {
       rpmmsq msq = NULL;
       int rc = mq_getattr(mqdes, attr);
  -SPEW("<--\t%s(0x%x,%p) rc %d\n", __FUNCTION__, mqdes, attr, rc);
  +SPEW("<--\t    %s(0x%x,%p) rc %d\n", __FUNCTION__, mqdes, attr, rc);
       return rc;
   }
   
  @@ -90,7 +90,7 @@
   {
       rpmmsq msq = NULL;
       int rc = mq_setattr(mqdes, newattr, oldattr);
  -SPEW("<--\t%s(0x%x,%p,%p) rc %d\n", __FUNCTION__, mqdes, newattr, oldattr, 
rc);
  +SPEW("<--\t    %s(0x%x,%p,%p) rc %d\n", __FUNCTION__, mqdes, newattr, 
oldattr, rc);
       return rc;
   }
   
  @@ -99,7 +99,7 @@
   {
       rpmmsq msq = NULL;
       int rc = mq_notify(mqdes, sevp);
  -SPEW("<--\t%s(0x%x,%p) rc %d\n", __FUNCTION__, mqdes, sevp, rc);
  +SPEW("<--\t    %s(0x%x,%p) rc %d\n", __FUNCTION__, mqdes, sevp, rc);
       return rc;
   }
   
  @@ -124,7 +124,7 @@
            break;
        }
       }
  -SPEW("<--\t%s(0x%x,%p[%lu],%u) rc %d\t\t\t\"%.*s\"\n", __FUNCTION__, mqdes, 
buf, (unsigned long)count, prio, rc, nc, buf);
  +SPEW("<--\t    %s(0x%x,%p[%lu],%u) rc %d\t\t\t\"%.*s\"\n", __FUNCTION__, 
mqdes, buf, (unsigned long)count, prio, rc, nc, buf);
       return rc;
   }
   
  @@ -150,7 +150,7 @@
            break;
        }
       }
  -SPEW("<--\t%s(0x%x,%p[%lu],%u,%p) rc %d\t\t\t\"%.*s\"\n", __FUNCTION__, 
mqdes, buf, (unsigned long)count, prio, abs_timeout, rc, nc, buf);
  +SPEW("<--\t    %s(0x%x,%p[%lu],%u,...) rc %d\t\t\t\"%.*s\"\n", __FUNCTION__, 
mqdes, buf, (unsigned long)count, prio, rc, nc, buf);
       return rc;
   }
   
  @@ -164,7 +164,6 @@
       int nc = 0;
       char * buf = msg_ptr;
       size_t count = msg_len;
  -    unsigned int *priop = msg_prio;
       unsigned int prio = *msg_prio;
       ssize_t rc;
   
  @@ -177,7 +176,7 @@
            break;
        }
       }
  -SPEW("<--\t%s(0x%x,%p[%lu],%p) rc %ld prio %u\t\"%.*s\"\n", __FUNCTION__, 
mqdes, buf, (unsigned long)count, priop, (long)rc, prio, nc, buf);
  +SPEW("<--\t    %s(0x%x,%p[%lu],...) rc %ld prio %u\t\"%.*s\"\n", 
__FUNCTION__, mqdes, buf, (unsigned long)count, rc, prio, nc, buf);
       return rc;
   }
   
  @@ -191,7 +190,6 @@
       int nc = 0;
       char * buf = msg_ptr;
       size_t count = msg_len;
  -    unsigned int *priop = msg_prio;
       unsigned int prio = *msg_prio;
       ssize_t rc;
   
  @@ -204,7 +202,7 @@
            break;
        }
       }
  -SPEW("<--\t%s(0x%x,%p[%lu],%p,%p) rc %ld prio %u\t\"%.*s\"\n", __FUNCTION__, 
mqdes, buf, (unsigned long)count, priop, abs_timeout, (long)rc, prio, nc, buf);
  +SPEW("<--\t    %s(0x%x,%p[%lu],...) rc %ld prio %u\t\"%.*s\"\n", 
__FUNCTION__, mqdes, buf, (unsigned long)count, rc, prio, nc, buf);
       return rc;
   }
   
  @@ -213,7 +211,7 @@
   {
       rpmmsq msq = NULL;
       int rc = mq_close(mqdes);
  -SPEW("<--\t%s(0x%x) rc %d\n", __FUNCTION__, mqdes, rc);
  +SPEW("<--\t    %s(0x%x) rc %d\n", __FUNCTION__, mqdes, rc);
       return rc;
   }
   
  @@ -222,7 +220,7 @@
   {
       rpmmsq msq = NULL;
       int rc = mq_unlink(name);
  -SPEW("<--\t%s(%s) rc %d\n", __FUNCTION__, name, rc);
  +SPEW("<--\t    %s(%s) rc %d\n", __FUNCTION__, name, rc);
       return rc;
   }
   
  @@ -235,7 +233,7 @@
   {
       rpmmsq msq = NULL;
       int rc = msgget(key, msgflg);
  -SPEW("<--\t%s(0x%x,0%o) rc %d\n", __FUNCTION__, key, msgflg, rc);
  +SPEW("<--\t    %s(0x%x,0%o) rc %d\n", __FUNCTION__, key, msgflg, rc);
       return rc;
   }
   
  @@ -258,7 +256,7 @@
            break;
        }
       }
  -SPEW("<--\t%s(0x%x,%p,%lu,%ld,%d) rc %ld\t\"%.*s\"\n", __FUNCTION__, msqid, 
msgp, (unsigned long)msgsz, msgtyp, msgflg, (long)rc, nc, buf);
  +SPEW("<--\t    %s(0x%x,%p,%lu,%ld,%d) rc %ld\t\"%.*s\"\n", __FUNCTION__, 
msqid, msgp, (unsigned long)msgsz, msgtyp, msgflg, (long)rc, nc, buf);
       return rc;
   }
   
  @@ -281,7 +279,7 @@
            break;
        }
       }
  -SPEW("<--\t%s(0x%x,%p,%lu,%d) rc %d\t\t\"%.*s\"\n", __FUNCTION__, msqid, 
msgp, (unsigned long)msgsz, msgflg, rc, nc, buf);
  +SPEW("<--\t    %s(0x%x,%p,%lu,%d) rc %d\t\t\"%.*s\"\n", __FUNCTION__, msqid, 
msgp, (unsigned long)msgsz, msgflg, rc, nc, buf);
       return rc;
   }
   
  @@ -290,7 +288,7 @@
   {
       rpmmsq msq = NULL;
       int rc = msgctl(msqid, cmd, buf);
  -SPEW("<--\t%s(0x%x,%d,%p) rc %d\n", __FUNCTION__, msqid, cmd, buf, rc);
  +SPEW("<--\t    %s(0x%x,%d,%p) rc %d\n", __FUNCTION__, msqid, cmd, buf, rc);
       return rc;
   }
   #endif       /* WITH_MSQ */
  @@ -317,7 +315,7 @@
        PRINT(d, msgsize);
   #endif       /* WITH_MSQ */
        if (stats) {
  -         PRINT(d, nqueued);
  +         PRINT(d, inflight);
            PRINT(d, nsent);
            PRINT(d, nrecv);
            PRINT(d, ntimeout);
  @@ -326,6 +324,7 @@
        PRINT(p, aioq.head);
        PRINT(p, aioq.tail);
        PRINT(d, nqueued);
  +     PRINT(d, ndelay);
   #undef       PRINT
       }
   }
  @@ -358,15 +357,18 @@
   static inline
   void aioqInit(AIOQ_t aioq)
   {
  +    rpmmsq msq = NULL;
       RPM_GNUC_TM_ATOMIC {
        aioq->head = NULL;
        aioq->tail = &aioq->head;
       }
  +SPEW("<--\t%s(%p)\n", __FUNCTION__, aioq);
   }
   
   static inline
   rpmioItem aioqGetHead(AIOQ_t aioq)
   {
  +    rpmmsq msq = NULL;
       rpmioItem item;
       RPM_GNUC_TM_ATOMIC {
        if ((item = aioq->head) != NULL) {
  @@ -376,25 +378,43 @@
                aioq->tail = &aioq->head;
        }
       }
  +if (item)
  +SPEW("<--\t%s(%p) item %p\n", __FUNCTION__, aioq, item);
       return item;
   }
   
   static inline
   void aioqPutTail(AIOQ_t aioq, rpmioItem item)
   {
  +    rpmmsq msq = NULL;
       RPM_GNUC_TM_ATOMIC {
        *aioq->tail = item;
        aioq->tail = (rpmioItem *) &item->next;
       }
  +SPEW("<--\t%s(%p,%p)\n", __FUNCTION__, aioq, item);
   }
   
   static inline
  -void aioqPutWait(AIOQ_t aioq, rpmioItem item, rpmzLog zlog)
  +ssize_t aioqPutWait(AIOQ_t aioq, rpmioItem item, rpmzLog zlog,
  +             unsigned long *priop)
   {
  +    rpmmsq msq = NULL;
  +    ssize_t rc;
  +
       POSSESS(item->use);
       aioqPutTail(aioq, item);
       WAITFOR(item->use, TO_BE, 1);
       RELEASE(item->use);
  +
  +    rpmaio aio = (rpmaio) item;
  +    rc = aio_return(&aio->cb);
  +    if (rc >= 0 && priop)
  +     *priop = aio->cb.aio_reqprio;           /* XXX prio */
  +    if (rc < 0)
  +     errno = aio_error(&aio->cb);            /* XXX W2DO? */
  +
  +SPEW("<--\t%s(%p,%p) rc %ld\n", __FUNCTION__, aioq, item, rc);
  +    return rc;
   }
   
   /* =============================================================== */
  @@ -431,7 +451,7 @@
        PRINT(ld, mtype);
        PRINT(d, msgmax);
        PRINT(d, msgsize);
  -     PRINT(d, nqueued);
  +     PRINT(d, inflight);
        PRINT(d, nsent);
        PRINT(d, nrecv);
        PRINT(d, ntimeout);
  @@ -454,6 +474,7 @@
        PRINT(p, aioq.head);
        PRINT(p, aioq.tail);
        PRINT(d, nqueued);
  +     PRINT(d, ndelay);
   
   #undef       PRINT_SIGEV
   #undef       PRINT_ATTRS
  @@ -474,7 +495,7 @@
       rpmmsq msq = (rpmmsq) _msq;
   
       if (msq) {
  -SPEW("%s: queued %d sent %d recv %d timeout %d again %d\n", __FUNCTION__, 
msq->nqueued, msq->nsent, msq->nrecv, msq->ntimeout, msq->nagain);
  +SPEW("%s: inflight %d queued %d sent %d recv %d timeout %d again %d delay 
%d\n", __FUNCTION__, msq->inflight, msq->nqueued, msq->nsent, msq->nrecv, 
msq->ntimeout, msq->nagain, msq->ndelay);
        msq->msgmax = MSQ_MSGMAX;
        msq->msgsize = MSQ_MSGSIZE;
   
  @@ -503,6 +524,8 @@
                        __FUNCTION__, __FILE__, __LINE__);
        }
        msq->nqueued = 0;
  +     msq->inflight = 0;
  +     msq->ndelay = 0;
   
        if (msq->zlog)
            msq->zlog = rpmzLogDump(msq->zlog, NULL);
  @@ -617,6 +640,8 @@
       /* XXX TODO: use POSIX shared mutexes? */
       aioqInit(&msq->aioq);
       msq->nqueued = 0;
  +    msq->inflight = 0;
  +    msq->ndelay = 0;
       msq->nsent = 0;
       msq->nrecv = 0;
       msq->ntimeout = 0;
  @@ -739,19 +764,16 @@
                        __FUNCTION__, __FILE__, __LINE__);
   
        /* Queue the read. */
  -     aioqPutWait(&msq->aioq, item, msq->zlog);
  -
  -     rc = aio->cb.__return_value;            /* XXX aio_return */
  +     rc = aioqPutWait(&msq->aioq, item, msq->zlog, priop);
  +#ifdef       DYING
        if (rc >= 0)
            prio = aio->cb.aio_reqprio;         /* XXX prio */
  -     else
  -         errno = aio_error(&aio->cb);
  +#endif
  +
        aio = rpmaioFree(aio);
   
  -     if (rc >= 0) {
  -         if (priop)
  -             *priop = prio;
  -     }
  +     if (rc >= 0 && priop)
  +         *priop = prio;
   #endif       /* WITH_MQ */
       }        break;
   
  @@ -796,12 +818,13 @@
            rc = Mq_send(msq->qid, buf, count, prio);
            if (rc == 0)
                msq->nsent++;
  +         msq->inflight++;
   
            /* Yield the CPU. */
            sched_yield();
            struct timespec ts = { 0, 100*1000 };
            Z(nanosleep(&ts, NULL));
  -         /* Wait for the reader thread. */
  +
        } else {
            rpmaio aio = rpmaioNew(msq->qid, LIO_WRITE, prio, (char *)buf, 
count);
            aio->ix = ++msq->nqueued;
  @@ -811,11 +834,8 @@
                        __FUNCTION__, __FILE__, __LINE__);
   
            /* Queue the write. */
  -         aioqPutWait(&msq->aioq, item, msq->zlog);
  +         rc = aioqPutWait(&msq->aioq, item, msq->zlog, NULL);
   
  -         rc = aio->cb.__return_value;        /* XXX aio_return */
  -         if (rc < 0)
  -             errno = aio_error(&aio->cb);
            aio = rpmaioFree(aio);
        }
   
  @@ -889,6 +909,7 @@
                    msq->qid = -1;
                break;
            }
  +
            unsigned prio = 0;
            rpmaio aio = rpmaioNew(msq->qid, LIO_CLOSE, prio, NULL, 0);
            aio->ix = ++msq->nqueued;
  @@ -898,11 +919,13 @@
                        __FUNCTION__, __FILE__, __LINE__);
   
            /* Queue the write. */
  -         aioqPutWait(&msq->aioq, item, msq->zlog);
  +         rc = aioqPutWait(&msq->aioq, item, msq->zlog, NULL);
  +
  +         /* Yield the CPU. */
  +         sched_yield();
  +         struct timespec ts = { 0, 100*1000 };
  +         Z(nanosleep(&ts, NULL));
   
  -         rc = aio->cb.__return_value;        /* XXX aio_return */
  -         if (rc < 0)
  -             errno = aio_error(&aio->cb);
            aio = rpmaioFree(aio);
   
        }
  @@ -1088,14 +1111,20 @@
            rpmioItem item;
   
            /* Get next request from queue. */
  -         if ((item = aioqGetHead(&msq->aioq)) == NULL)
  +         if ((item = aioqGetHead(&msq->aioq)) == NULL) {
  +             msq->ndelay++;
  +             /* Yield the CPU. */
  +             sched_yield();
  +             struct timespec ts = { 0, 100*1000 };
  +             Z(nanosleep(&ts, NULL));
                break;
  +         }
   
            rpmaio aio = (rpmaio) item;
            char *b = (char *)aio->cb.aio_buf;
            size_t nb = aio->cb.aio_nbytes;
   
  -         errno = 0;
  +         rc = -1;
            switch (aio->cb.aio_lio_opcode) {
            case LIO_READ:
            {   unsigned int prio = 0;
  @@ -1104,6 +1133,8 @@
                    msq->nrecv++;
                    aio->cb.aio_reqprio = prio;         /* XXX prio */
                }
  +             if (msq->inflight > 0)
  +                 msq->inflight--;
            }   break;
            case LIO_WRITE:
            {   unsigned int prio = aio->cb.aio_reqprio;        /* XXX prio */
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmsq.h
  ============================================================================
  $ cvs diff -u -r1.1.2.14 -r1.1.2.15 rpmmsq.h
  --- rpm/rpmio/rpmmsq.h        28 May 2017 20:18:16 -0000      1.1.2.14
  +++ rpm/rpmio/rpmmsq.h        28 May 2017 22:14:55 -0000      1.1.2.15
  @@ -54,6 +54,8 @@
       int msgmax;                      /*!< max. inflight messages. */
       int msgsize;             /*!< max. message size. */
   
  +    int inflight;
  +    int ndelay;
       int nqueued;             /*!< no. messages queued. */
       int nsent;                       /*!< no. messages sent. */
       int nrecv;                       /*!< no. messages received. */
  @@ .
______________________________________________________________________
RPM Package Manager                                    http://rpm5.org
CVS Sources Repository                                rpm-cvs@rpm5.org

Reply via email to