RPM Package Manager, CVS Repository http://rpm5.org/cvs/ ____________________________________________________________________________
Server: rpm5.org Name: Jeff Johnson Root: /v/rpm/cvs Email: j...@rpm5.org Module: rpm Date: 29-May-2017 00:14:55 Branch: rpm-5_4 Handle: 2017052822145500 Modified files: (Branch: rpm-5_4) rpm/rpmio msqio.c rpmmsq.h Log: - msqio: refactor errocide retrieval into aioqPutWait. Summary: Revision Changes Path 1.1.2.18 +71 -40 rpm/rpmio/msqio.c 1.1.2.15 +2 -0 rpm/rpmio/rpmmsq.h ____________________________________________________________________________ patch -p0 <<'@@ .' Index: rpm/rpmio/msqio.c ============================================================================ $ cvs diff -u -r1.1.2.17 -r1.1.2.18 msqio.c --- rpm/rpmio/msqio.c 28 May 2017 20:18:16 -0000 1.1.2.17 +++ rpm/rpmio/msqio.c 28 May 2017 22:14:55 -0000 1.1.2.18 @@ -72,7 +72,7 @@ { rpmmsq msq = NULL; mqd_t mqdes = mq_open(name, oflag, mode, attr); -SPEW("<--\t%s(%s,0x%x,0%o,%p) qid %d\n", __FUNCTION__, name, oflag, mode, attr, mqdes); +SPEW("<--\t %s(%s,0x%x,0%o,%p) qid %d\n", __FUNCTION__, name, oflag, mode, attr, mqdes); return mqdes; } @@ -81,7 +81,7 @@ { rpmmsq msq = NULL; int rc = mq_getattr(mqdes, attr); -SPEW("<--\t%s(0x%x,%p) rc %d\n", __FUNCTION__, mqdes, attr, rc); +SPEW("<--\t %s(0x%x,%p) rc %d\n", __FUNCTION__, mqdes, attr, rc); return rc; } @@ -90,7 +90,7 @@ { rpmmsq msq = NULL; int rc = mq_setattr(mqdes, newattr, oldattr); -SPEW("<--\t%s(0x%x,%p,%p) rc %d\n", __FUNCTION__, mqdes, newattr, oldattr, rc); +SPEW("<--\t %s(0x%x,%p,%p) rc %d\n", __FUNCTION__, mqdes, newattr, oldattr, rc); return rc; } @@ -99,7 +99,7 @@ { rpmmsq msq = NULL; int rc = mq_notify(mqdes, sevp); -SPEW("<--\t%s(0x%x,%p) rc %d\n", __FUNCTION__, mqdes, sevp, rc); +SPEW("<--\t %s(0x%x,%p) rc %d\n", __FUNCTION__, mqdes, sevp, rc); return rc; } @@ -124,7 +124,7 @@ break; } } -SPEW("<--\t%s(0x%x,%p[%lu],%u) rc %d\t\t\t\"%.*s\"\n", __FUNCTION__, mqdes, buf, (unsigned long)count, prio, rc, nc, buf); +SPEW("<--\t %s(0x%x,%p[%lu],%u) rc %d\t\t\t\"%.*s\"\n", __FUNCTION__, mqdes, buf, (unsigned long)count, prio, rc, nc, buf); return rc; } @@ -150,7 +150,7 @@ break; } } -SPEW("<--\t%s(0x%x,%p[%lu],%u,%p) rc %d\t\t\t\"%.*s\"\n", __FUNCTION__, mqdes, buf, (unsigned long)count, prio, abs_timeout, rc, nc, buf); +SPEW("<--\t %s(0x%x,%p[%lu],%u,...) rc %d\t\t\t\"%.*s\"\n", __FUNCTION__, mqdes, buf, (unsigned long)count, prio, rc, nc, buf); return rc; } @@ -164,7 +164,6 @@ int nc = 0; char * buf = msg_ptr; size_t count = msg_len; - unsigned int *priop = msg_prio; unsigned int prio = *msg_prio; ssize_t rc; @@ -177,7 +176,7 @@ break; } } -SPEW("<--\t%s(0x%x,%p[%lu],%p) rc %ld prio %u\t\"%.*s\"\n", __FUNCTION__, mqdes, buf, (unsigned long)count, priop, (long)rc, prio, nc, buf); +SPEW("<--\t %s(0x%x,%p[%lu],...) rc %ld prio %u\t\"%.*s\"\n", __FUNCTION__, mqdes, buf, (unsigned long)count, rc, prio, nc, buf); return rc; } @@ -191,7 +190,6 @@ int nc = 0; char * buf = msg_ptr; size_t count = msg_len; - unsigned int *priop = msg_prio; unsigned int prio = *msg_prio; ssize_t rc; @@ -204,7 +202,7 @@ break; } } -SPEW("<--\t%s(0x%x,%p[%lu],%p,%p) rc %ld prio %u\t\"%.*s\"\n", __FUNCTION__, mqdes, buf, (unsigned long)count, priop, abs_timeout, (long)rc, prio, nc, buf); +SPEW("<--\t %s(0x%x,%p[%lu],...) rc %ld prio %u\t\"%.*s\"\n", __FUNCTION__, mqdes, buf, (unsigned long)count, rc, prio, nc, buf); return rc; } @@ -213,7 +211,7 @@ { rpmmsq msq = NULL; int rc = mq_close(mqdes); -SPEW("<--\t%s(0x%x) rc %d\n", __FUNCTION__, mqdes, rc); +SPEW("<--\t %s(0x%x) rc %d\n", __FUNCTION__, mqdes, rc); return rc; } @@ -222,7 +220,7 @@ { rpmmsq msq = NULL; int rc = mq_unlink(name); -SPEW("<--\t%s(%s) rc %d\n", __FUNCTION__, name, rc); +SPEW("<--\t %s(%s) rc %d\n", __FUNCTION__, name, rc); return rc; } @@ -235,7 +233,7 @@ { rpmmsq msq = NULL; int rc = msgget(key, msgflg); -SPEW("<--\t%s(0x%x,0%o) rc %d\n", __FUNCTION__, key, msgflg, rc); +SPEW("<--\t %s(0x%x,0%o) rc %d\n", __FUNCTION__, key, msgflg, rc); return rc; } @@ -258,7 +256,7 @@ break; } } -SPEW("<--\t%s(0x%x,%p,%lu,%ld,%d) rc %ld\t\"%.*s\"\n", __FUNCTION__, msqid, msgp, (unsigned long)msgsz, msgtyp, msgflg, (long)rc, nc, buf); +SPEW("<--\t %s(0x%x,%p,%lu,%ld,%d) rc %ld\t\"%.*s\"\n", __FUNCTION__, msqid, msgp, (unsigned long)msgsz, msgtyp, msgflg, (long)rc, nc, buf); return rc; } @@ -281,7 +279,7 @@ break; } } -SPEW("<--\t%s(0x%x,%p,%lu,%d) rc %d\t\t\"%.*s\"\n", __FUNCTION__, msqid, msgp, (unsigned long)msgsz, msgflg, rc, nc, buf); +SPEW("<--\t %s(0x%x,%p,%lu,%d) rc %d\t\t\"%.*s\"\n", __FUNCTION__, msqid, msgp, (unsigned long)msgsz, msgflg, rc, nc, buf); return rc; } @@ -290,7 +288,7 @@ { rpmmsq msq = NULL; int rc = msgctl(msqid, cmd, buf); -SPEW("<--\t%s(0x%x,%d,%p) rc %d\n", __FUNCTION__, msqid, cmd, buf, rc); +SPEW("<--\t %s(0x%x,%d,%p) rc %d\n", __FUNCTION__, msqid, cmd, buf, rc); return rc; } #endif /* WITH_MSQ */ @@ -317,7 +315,7 @@ PRINT(d, msgsize); #endif /* WITH_MSQ */ if (stats) { - PRINT(d, nqueued); + PRINT(d, inflight); PRINT(d, nsent); PRINT(d, nrecv); PRINT(d, ntimeout); @@ -326,6 +324,7 @@ PRINT(p, aioq.head); PRINT(p, aioq.tail); PRINT(d, nqueued); + PRINT(d, ndelay); #undef PRINT } } @@ -358,15 +357,18 @@ static inline void aioqInit(AIOQ_t aioq) { + rpmmsq msq = NULL; RPM_GNUC_TM_ATOMIC { aioq->head = NULL; aioq->tail = &aioq->head; } +SPEW("<--\t%s(%p)\n", __FUNCTION__, aioq); } static inline rpmioItem aioqGetHead(AIOQ_t aioq) { + rpmmsq msq = NULL; rpmioItem item; RPM_GNUC_TM_ATOMIC { if ((item = aioq->head) != NULL) { @@ -376,25 +378,43 @@ aioq->tail = &aioq->head; } } +if (item) +SPEW("<--\t%s(%p) item %p\n", __FUNCTION__, aioq, item); return item; } static inline void aioqPutTail(AIOQ_t aioq, rpmioItem item) { + rpmmsq msq = NULL; RPM_GNUC_TM_ATOMIC { *aioq->tail = item; aioq->tail = (rpmioItem *) &item->next; } +SPEW("<--\t%s(%p,%p)\n", __FUNCTION__, aioq, item); } static inline -void aioqPutWait(AIOQ_t aioq, rpmioItem item, rpmzLog zlog) +ssize_t aioqPutWait(AIOQ_t aioq, rpmioItem item, rpmzLog zlog, + unsigned long *priop) { + rpmmsq msq = NULL; + ssize_t rc; + POSSESS(item->use); aioqPutTail(aioq, item); WAITFOR(item->use, TO_BE, 1); RELEASE(item->use); + + rpmaio aio = (rpmaio) item; + rc = aio_return(&aio->cb); + if (rc >= 0 && priop) + *priop = aio->cb.aio_reqprio; /* XXX prio */ + if (rc < 0) + errno = aio_error(&aio->cb); /* XXX W2DO? */ + +SPEW("<--\t%s(%p,%p) rc %ld\n", __FUNCTION__, aioq, item, rc); + return rc; } /* =============================================================== */ @@ -431,7 +451,7 @@ PRINT(ld, mtype); PRINT(d, msgmax); PRINT(d, msgsize); - PRINT(d, nqueued); + PRINT(d, inflight); PRINT(d, nsent); PRINT(d, nrecv); PRINT(d, ntimeout); @@ -454,6 +474,7 @@ PRINT(p, aioq.head); PRINT(p, aioq.tail); PRINT(d, nqueued); + PRINT(d, ndelay); #undef PRINT_SIGEV #undef PRINT_ATTRS @@ -474,7 +495,7 @@ rpmmsq msq = (rpmmsq) _msq; if (msq) { -SPEW("%s: queued %d sent %d recv %d timeout %d again %d\n", __FUNCTION__, msq->nqueued, msq->nsent, msq->nrecv, msq->ntimeout, msq->nagain); +SPEW("%s: inflight %d queued %d sent %d recv %d timeout %d again %d delay %d\n", __FUNCTION__, msq->inflight, msq->nqueued, msq->nsent, msq->nrecv, msq->ntimeout, msq->nagain, msq->ndelay); msq->msgmax = MSQ_MSGMAX; msq->msgsize = MSQ_MSGSIZE; @@ -503,6 +524,8 @@ __FUNCTION__, __FILE__, __LINE__); } msq->nqueued = 0; + msq->inflight = 0; + msq->ndelay = 0; if (msq->zlog) msq->zlog = rpmzLogDump(msq->zlog, NULL); @@ -617,6 +640,8 @@ /* XXX TODO: use POSIX shared mutexes? */ aioqInit(&msq->aioq); msq->nqueued = 0; + msq->inflight = 0; + msq->ndelay = 0; msq->nsent = 0; msq->nrecv = 0; msq->ntimeout = 0; @@ -739,19 +764,16 @@ __FUNCTION__, __FILE__, __LINE__); /* Queue the read. */ - aioqPutWait(&msq->aioq, item, msq->zlog); - - rc = aio->cb.__return_value; /* XXX aio_return */ + rc = aioqPutWait(&msq->aioq, item, msq->zlog, priop); +#ifdef DYING if (rc >= 0) prio = aio->cb.aio_reqprio; /* XXX prio */ - else - errno = aio_error(&aio->cb); +#endif + aio = rpmaioFree(aio); - if (rc >= 0) { - if (priop) - *priop = prio; - } + if (rc >= 0 && priop) + *priop = prio; #endif /* WITH_MQ */ } break; @@ -796,12 +818,13 @@ rc = Mq_send(msq->qid, buf, count, prio); if (rc == 0) msq->nsent++; + msq->inflight++; /* Yield the CPU. */ sched_yield(); struct timespec ts = { 0, 100*1000 }; Z(nanosleep(&ts, NULL)); - /* Wait for the reader thread. */ + } else { rpmaio aio = rpmaioNew(msq->qid, LIO_WRITE, prio, (char *)buf, count); aio->ix = ++msq->nqueued; @@ -811,11 +834,8 @@ __FUNCTION__, __FILE__, __LINE__); /* Queue the write. */ - aioqPutWait(&msq->aioq, item, msq->zlog); + rc = aioqPutWait(&msq->aioq, item, msq->zlog, NULL); - rc = aio->cb.__return_value; /* XXX aio_return */ - if (rc < 0) - errno = aio_error(&aio->cb); aio = rpmaioFree(aio); } @@ -889,6 +909,7 @@ msq->qid = -1; break; } + unsigned prio = 0; rpmaio aio = rpmaioNew(msq->qid, LIO_CLOSE, prio, NULL, 0); aio->ix = ++msq->nqueued; @@ -898,11 +919,13 @@ __FUNCTION__, __FILE__, __LINE__); /* Queue the write. */ - aioqPutWait(&msq->aioq, item, msq->zlog); + rc = aioqPutWait(&msq->aioq, item, msq->zlog, NULL); + + /* Yield the CPU. */ + sched_yield(); + struct timespec ts = { 0, 100*1000 }; + Z(nanosleep(&ts, NULL)); - rc = aio->cb.__return_value; /* XXX aio_return */ - if (rc < 0) - errno = aio_error(&aio->cb); aio = rpmaioFree(aio); } @@ -1088,14 +1111,20 @@ rpmioItem item; /* Get next request from queue. */ - if ((item = aioqGetHead(&msq->aioq)) == NULL) + if ((item = aioqGetHead(&msq->aioq)) == NULL) { + msq->ndelay++; + /* Yield the CPU. */ + sched_yield(); + struct timespec ts = { 0, 100*1000 }; + Z(nanosleep(&ts, NULL)); break; + } rpmaio aio = (rpmaio) item; char *b = (char *)aio->cb.aio_buf; size_t nb = aio->cb.aio_nbytes; - errno = 0; + rc = -1; switch (aio->cb.aio_lio_opcode) { case LIO_READ: { unsigned int prio = 0; @@ -1104,6 +1133,8 @@ msq->nrecv++; aio->cb.aio_reqprio = prio; /* XXX prio */ } + if (msq->inflight > 0) + msq->inflight--; } break; case LIO_WRITE: { unsigned int prio = aio->cb.aio_reqprio; /* XXX prio */ @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmsq.h ============================================================================ $ cvs diff -u -r1.1.2.14 -r1.1.2.15 rpmmsq.h --- rpm/rpmio/rpmmsq.h 28 May 2017 20:18:16 -0000 1.1.2.14 +++ rpm/rpmio/rpmmsq.h 28 May 2017 22:14:55 -0000 1.1.2.15 @@ -54,6 +54,8 @@ int msgmax; /*!< max. inflight messages. */ int msgsize; /*!< max. message size. */ + int inflight; + int ndelay; int nqueued; /*!< no. messages queued. */ int nsent; /*!< no. messages sent. */ int nrecv; /*!< no. messages received. */ @@ . ______________________________________________________________________ RPM Package Manager http://rpm5.org CVS Sources Repository rpm-cvs@rpm5.org