RPM Package Manager, CVS Repository http://rpm5.org/cvs/ ____________________________________________________________________________
Server: rpm5.org Name: Jeff Johnson Root: /v/rpm/cvs Email: j...@rpm5.org Module: rpm Date: 29-May-2017 08:57:54 Branch: rpm-5_4 Handle: 2017052906575400 Modified files: (Branch: rpm-5_4) rpm/rpmio msqio.c rpmmsq.h Log: - msqio: haul out the trash. Summary: Revision Changes Path 1.1.2.19 +106 -126 rpm/rpmio/msqio.c 1.1.2.16 +0 -2 rpm/rpmio/rpmmsq.h ____________________________________________________________________________ patch -p0 <<'@@ .' Index: rpm/rpmio/msqio.c ============================================================================ $ cvs diff -u -r1.1.2.18 -r1.1.2.19 msqio.c --- rpm/rpmio/msqio.c 28 May 2017 22:14:55 -0000 1.1.2.18 +++ rpm/rpmio/msqio.c 29 May 2017 06:57:54 -0000 1.1.2.19 @@ -32,6 +32,8 @@ typedef struct AIOQ_s * AIOQ_t; struct AIOQ_s { + yarnLock have; + void * pool; rpmioItem head; /*!< AIO cb queue head. */ rpmioItem * tail; /*!< AIO cb queue tail. */ }; @@ -274,7 +276,7 @@ if (rc == 0) { /* Ensure printable. */ for (nc = 0; nc < ncmax; nc++) { - if (nc < rc && isprint(buf[nc])) + if (nc < (int)msgsz && isprint(buf[nc])) continue; break; } @@ -315,7 +317,6 @@ PRINT(d, msgsize); #endif /* WITH_MSQ */ if (stats) { - PRINT(d, inflight); PRINT(d, nsent); PRINT(d, nrecv); PRINT(d, ntimeout); @@ -324,14 +325,13 @@ PRINT(p, aioq.head); PRINT(p, aioq.tail); PRINT(d, nqueued); - PRINT(d, ndelay); #undef PRINT } } /* =============================================================== */ static int _lockdebug = 0; -static int _conddebug = -1; +static int _conddebug = 0; #define PEEK(_bolt) yarnPeekLock(_bolt) #define POSSESS(_bolt) \ { yarnPossess(_bolt); \ @@ -345,7 +345,7 @@ } #define TWIST(_bolt, _op, _val) \ { if (zlog && _conddebug) \ - rpmzlogAdd(zlog, "***\t TWIST(%p, %d, %d)\t%ld", _bolt, _op, _val, PEEK(_bolt)); \ + rpmzLogAdd(zlog, "***\t TWIST(%p, %d, %d)\t%ld", _bolt, _op, _val, PEEK(_bolt)); \ yarnTwist(_bolt, _op, _val); \ } #define WAITFOR(_bolt, _op, _val) \ @@ -359,6 +359,8 @@ { rpmmsq msq = NULL; RPM_GNUC_TM_ATOMIC { + aioq->have = yarnNewLock(0); + aioq->pool = NULL; aioq->head = NULL; aioq->tail = &aioq->head; } @@ -379,7 +381,7 @@ } } if (item) -SPEW("<--\t%s(%p) item %p\n", __FUNCTION__, aioq, item); +SPEW("<--\t%s(%p) %p\n", __FUNCTION__, aioq, item); return item; } @@ -391,29 +393,43 @@ *aioq->tail = item; aioq->tail = (rpmioItem *) &item->next; } -SPEW("<--\t%s(%p,%p)\n", __FUNCTION__, aioq, item); +SPEW("<--\t%s(%p, %p)\n", __FUNCTION__, aioq, item); } static inline -ssize_t aioqPutWait(AIOQ_t aioq, rpmioItem item, rpmzLog zlog, - unsigned long *priop) +ssize_t aioqPutWait(rpmmsq msq, unsigned long *priop, + int op, int prio, void *b, size_t nb) { - rpmmsq msq = NULL; + AIOQ_t aioq = &msq->aioq; + rpmzLog zlog = msq->zlog; ssize_t rc; - POSSESS(item->use); + rpmaio aio = rpmaioNew(msq->qid, op, prio, b, nb); + aio->ix = ++msq->nqueued; + + rpmioItem item = rpmioLinkPoolItem(&aio->_item, + __FUNCTION__, __FILE__, __LINE__); + + /* Producer monitor. */ + POSSESS(aioq->have); + WAITFOR(aioq->have, NOT_TO_BE, msq->msgmax); aioqPutTail(aioq, item); + TWIST(aioq->have, BY, 1); + + /* Wait for AIO completion. */ + POSSESS(item->use); WAITFOR(item->use, TO_BE, 1); RELEASE(item->use); - rpmaio aio = (rpmaio) item; rc = aio_return(&aio->cb); if (rc >= 0 && priop) *priop = aio->cb.aio_reqprio; /* XXX prio */ if (rc < 0) errno = aio_error(&aio->cb); /* XXX W2DO? */ -SPEW("<--\t%s(%p,%p) rc %ld\n", __FUNCTION__, aioq, item, rc); + aio = rpmaioFree(aio); + +SPEW("<--\t%s(%p, %p) rc %ld\n", __FUNCTION__, aioq, item, rc); return rc; } @@ -451,7 +467,6 @@ PRINT(ld, mtype); PRINT(d, msgmax); PRINT(d, msgsize); - PRINT(d, inflight); PRINT(d, nsent); PRINT(d, nrecv); PRINT(d, ntimeout); @@ -474,7 +489,6 @@ PRINT(p, aioq.head); PRINT(p, aioq.tail); PRINT(d, nqueued); - PRINT(d, ndelay); #undef PRINT_SIGEV #undef PRINT_ATTRS @@ -495,7 +509,7 @@ rpmmsq msq = (rpmmsq) _msq; if (msq) { -SPEW("%s: inflight %d queued %d sent %d recv %d timeout %d again %d delay %d\n", __FUNCTION__, msq->inflight, msq->nqueued, msq->nsent, msq->nrecv, msq->ntimeout, msq->nagain, msq->ndelay); +SPEW("%s: queued %d sent %d recv %d timeout %d again %d\n", __FUNCTION__, msq->nqueued, msq->nsent, msq->nrecv, msq->ntimeout, msq->nagain); msq->msgmax = MSQ_MSGMAX; msq->msgsize = MSQ_MSGSIZE; @@ -514,18 +528,25 @@ msq->key = 0; msq->mtype = 0; + AIOQ_t aioq = &msq->aioq; + rpmzLog zlog = msq->zlog; + rpmioItem item; + /* Drain the queue. */ + POSSESS(aioq->have); /* XXX enter */ while (1) { - rpmioItem item; - if ((item = aioqGetHead(&msq->aioq)) == NULL) + if (PEEK(aioq->have) == 0) break; - while (item) + item = aioqGetHead(&msq->aioq); + TWIST(aioq->have, BY, -1); /* XXX exit */ + while (item) /* XXX deadlock w msq ref */ item = rpmioFreePoolItem(item, __FUNCTION__, __FILE__, __LINE__); + POSSESS(aioq->have); /* XXX re-enter */ } + RELEASE(aioq->have); /* XXX exit */ + aioq->have = yarnFreeLock(aioq->have); msq->nqueued = 0; - msq->inflight = 0; - msq->ndelay = 0; if (msq->zlog) msq->zlog = rpmzLogDump(msq->zlog, NULL); @@ -640,8 +661,6 @@ /* XXX TODO: use POSIX shared mutexes? */ aioqInit(&msq->aioq); msq->nqueued = 0; - msq->inflight = 0; - msq->ndelay = 0; msq->nsent = 0; msq->nrecv = 0; msq->ntimeout = 0; @@ -755,25 +774,9 @@ case RPMMSQ_TYPE_POSIX: { #if defined(WITH_MQ) - unsigned int prio = 0; - - rpmaio aio = rpmaioNew(msq->qid, LIO_READ, 0, buf, count); - aio->ix = ++msq->nqueued; - - rpmioItem item = rpmioLinkPoolItem(&aio->_item, - __FUNCTION__, __FILE__, __LINE__); - /* Queue the read. */ - rc = aioqPutWait(&msq->aioq, item, msq->zlog, priop); -#ifdef DYING - if (rc >= 0) - prio = aio->cb.aio_reqprio; /* XXX prio */ -#endif - - aio = rpmaioFree(aio); - - if (rc >= 0 && priop) - *priop = prio; + rc = aioqPutWait(msq, priop, + LIO_READ, 0, buf, count); #endif /* WITH_MQ */ } break; @@ -813,32 +816,21 @@ case RPMMSQ_TYPE_POSIX: { #if defined(WITH_MQ) - + /* Send the first message to trigger the reader thread startup. */ if (msq->nsent == 0) { rc = Mq_send(msq->qid, buf, count, prio); if (rc == 0) msq->nsent++; - msq->inflight++; /* Yield the CPU. */ sched_yield(); struct timespec ts = { 0, 100*1000 }; Z(nanosleep(&ts, NULL)); - } else { - rpmaio aio = rpmaioNew(msq->qid, LIO_WRITE, prio, (char *)buf, count); - aio->ix = ++msq->nqueued; - aio->cb.aio_reqprio = prio; /* XXX prio */ - - rpmioItem item = rpmioLinkPoolItem(&aio->_item, - __FUNCTION__, __FILE__, __LINE__); - /* Queue the write. */ - rc = aioqPutWait(&msq->aioq, item, msq->zlog, NULL); - - aio = rpmaioFree(aio); + rc = aioqPutWait(msq, NULL, + LIO_WRITE, 0, (char *)buf, count); } - if (rc == 0) /* XXX remap to write(2) return */ rc = count; #endif /* WITH_MQ */ @@ -854,9 +846,8 @@ if (count) memcpy(msgp->mtext, buf, count); rc = Msgsnd(msq->qid, msgp, msgsz, msgflg); - if (rc == 0) { + if (rc == 0) msq->nsent++; - } msgp = _free(msgp); if (rc == 0) /* XXX remap to write(2) return */ @@ -902,32 +893,23 @@ if (MSQF_ISSET(INFO) || _rpmmsq_debug) rpmmsqDump(NULL, msq, NULL); + /* Reader thread never started: disable notify and close. */ if (msq->nsent == 0) { rc = rpmmsqNotify(msq, NULL); rc = Mq_close(msq->qid); if (rc == 0) msq->qid = -1; break; + } else { + /* Queue the close. */ + rc = aioqPutWait(msq, NULL, + LIO_CLOSE, 0, NULL, 0); } - unsigned prio = 0; - rpmaio aio = rpmaioNew(msq->qid, LIO_CLOSE, prio, NULL, 0); - aio->ix = ++msq->nqueued; - aio->cb.aio_reqprio = prio; /* XXX prio */ - - rpmioItem item = rpmioLinkPoolItem(&aio->_item, - __FUNCTION__, __FILE__, __LINE__); - - /* Queue the write. */ - rc = aioqPutWait(&msq->aioq, item, msq->zlog, NULL); - /* Yield the CPU. */ sched_yield(); - struct timespec ts = { 0, 100*1000 }; + struct timespec ts = { 0, 10*1000*1000 }; Z(nanosleep(&ts, NULL)); - - aio = rpmaioFree(aio); - } if (!rc && (delete || MSQF_ISSET(DELETE))) @@ -943,6 +925,7 @@ rpmmsqDump(__FUNCTION__, msq, NULL); } msq->qid = -1; + rc = 0; /* XXX */ if (delete || MSQF_ISSET(DELETE)) rc = Msgctl(msq->qid, IPC_RMID, NULL); #endif /* WITH_MSQ */ @@ -1105,64 +1088,61 @@ SPEW("==> %s(%p) qid %d\n", __FUNCTION__, msq, msq->qid); ssize_t rc; - while (msq->qid != -1) { - - while (1) { - rpmioItem item; - - /* Get next request from queue. */ - if ((item = aioqGetHead(&msq->aioq)) == NULL) { - msq->ndelay++; - /* Yield the CPU. */ - sched_yield(); - struct timespec ts = { 0, 100*1000 }; - Z(nanosleep(&ts, NULL)); - break; + if (msq->qid != -1) + do { + AIOQ_t aioq = &msq->aioq; + rpmzLog zlog = msq->zlog; + rpmioItem item; + + /* Consumer monitor. */ + POSSESS(aioq->have); + WAITFOR(aioq->have, NOT_TO_BE, 0); + item = aioqGetHead(&msq->aioq); + TWIST(aioq->have, BY, -1); +assert(item); + + /* Process the request. */ + rpmaio aio = (rpmaio) item; + char *b = (char *)aio->cb.aio_buf; + size_t nb = aio->cb.aio_nbytes; + + rc = -1; + switch (aio->cb.aio_lio_opcode) { + case LIO_READ: + { unsigned int prio = 0; + rc = rpmmsqReaderRead(msq, b, nb, &prio); + if (rc >= 0) { + msq->nrecv++; + aio->cb.aio_reqprio = prio; /* XXX prio */ } - - rpmaio aio = (rpmaio) item; - char *b = (char *)aio->cb.aio_buf; - size_t nb = aio->cb.aio_nbytes; - - rc = -1; - switch (aio->cb.aio_lio_opcode) { - case LIO_READ: - { unsigned int prio = 0; - rc = rpmmsqReaderRead(msq, b, nb, &prio); - if (rc >= 0) { - msq->nrecv++; - aio->cb.aio_reqprio = prio; /* XXX prio */ - } - if (msq->inflight > 0) - msq->inflight--; - } break; - case LIO_WRITE: - { unsigned int prio = aio->cb.aio_reqprio; /* XXX prio */ - rc = Mq_send(msq->qid, (const char *)b, nb, prio); + } break; + case LIO_WRITE: + { unsigned int prio = aio->cb.aio_reqprio; /* XXX prio */ + rc = Mq_send(msq->qid, (const char *)b, nb, prio); + if (rc == 0) + msq->nsent++; + } break; + case LIO_CLOSE: + { + if (b == NULL && nb == 0) { + rc = Mq_close(msq->qid); if (rc == 0) - msq->nsent++; - } break; - case LIO_CLOSE: - { - if (b == NULL && nb == 0) { - rc = Mq_close(msq->qid); - if (rc == 0) - msq->qid = -1; - } - } break; - case LIO_DSYNC: - case LIO_SYNC: - case LIO_READ64: - case LIO_WRITE64: - default: - break; + msq->qid = -1; } - - aio->cb.__error_code = errno; /* XXX */ - aio->cb.__return_value = rc; /* XXX */ - aio = rpmaioFree(aio); + } break; + case LIO_NOP: + case LIO_DSYNC: + case LIO_SYNC: + case LIO_READ64: + case LIO_WRITE64: + default: + break; } - } + + aio->cb.__error_code = errno; + aio->cb.__return_value = rc; + aio = rpmaioFree(aio); + } while (msq->qid != -1); SPEW("<== %s(%p) qid %d rc %ld\n", __FUNCTION__, msq, msq->qid, rc); @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmsq.h ============================================================================ $ cvs diff -u -r1.1.2.15 -r1.1.2.16 rpmmsq.h --- rpm/rpmio/rpmmsq.h 28 May 2017 22:14:55 -0000 1.1.2.15 +++ rpm/rpmio/rpmmsq.h 29 May 2017 06:57:54 -0000 1.1.2.16 @@ -54,8 +54,6 @@ int msgmax; /*!< max. inflight messages. */ int msgsize; /*!< max. message size. */ - int inflight; - int ndelay; int nqueued; /*!< no. messages queued. */ int nsent; /*!< no. messages sent. */ int nrecv; /*!< no. messages received. */ @@ . ______________________________________________________________________ RPM Package Manager http://rpm5.org CVS Sources Repository rpm-cvs@rpm5.org