RPM Package Manager, CVS Repository http://rpm5.org/cvs/ ____________________________________________________________________________
Server: rpm5.org Name: Jeff Johnson Root: /v/rpm/cvs Email: j...@rpm5.org Module: rpm Date: 28-May-2017 22:18:16 Branch: rpm-5_4 Handle: 2017052820181501 Modified files: (Branch: rpm-5_4) rpm CHANGES rpm/rpmio msqio.c rpmaio.c rpmmsq.h Log: - msqio: abstract out the AIO queue (AIOQ_t for now). Summary: Revision Changes Path 1.3501.2.566+1 -0 rpm/CHANGES 1.1.2.17 +87 -97 rpm/rpmio/msqio.c 1.1.2.2 +1 -1 rpm/rpmio/rpmaio.c 1.1.2.14 +1 -3 rpm/rpmio/rpmmsq.h ____________________________________________________________________________ patch -p0 <<'@@ .' Index: rpm/CHANGES ============================================================================ $ cvs diff -u -r1.3501.2.565 -r1.3501.2.566 CHANGES --- rpm/CHANGES 28 May 2017 18:39:22 -0000 1.3501.2.565 +++ rpm/CHANGES 28 May 2017 20:18:15 -0000 1.3501.2.566 @@ -1,4 +1,5 @@ 5.4.17 -> 5.4.18: + - jbj: msqio: abstract out the AIO queue (AIOQ_t for now). - jbj: rpmaio: stub in an AIO aiocb pool. - jbj: msqio: replace pthreads monitor with yarn. - jbj: rpmio: wire up the fooInit callback, pass the message to fooDbug. @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/msqio.c ============================================================================ $ cvs diff -u -r1.1.2.16 -r1.1.2.17 msqio.c --- rpm/rpmio/msqio.c 28 May 2017 18:39:22 -0000 1.1.2.16 +++ rpm/rpmio/msqio.c 28 May 2017 20:18:16 -0000 1.1.2.17 @@ -29,6 +29,13 @@ #define _RPMAIO_INTERNAL #include "rpmaio.h" + +typedef struct AIOQ_s * AIOQ_t; +struct AIOQ_s { + rpmioItem head; /*!< AIO cb queue head. */ + rpmioItem * tail; /*!< AIO cb queue tail. */ +}; + #define _RPMMSQ_INTERNAL #include "rpmmsq.h" @@ -47,30 +54,6 @@ #define Z(_rc) assert((_rc) == 0) -static int _lockdebug = 0; -static int _conddebug = -1; -#define PEEK(_bolt) yarnPeekLock(_bolt) -#define POSSESS(_bolt) \ - { yarnPossess(_bolt); \ - if (_lockdebug) \ - rpmzLogAdd(msq->zlog, "***\t POSSESS(%p)\t%ld", _bolt, PEEK(_bolt)); \ - } -#define RELEASE(_bolt) \ - { if (_lockdebug) \ - rpmzLogAdd(msq->zlog, "***\t RELEASE(%p)\t%ld", _bolt, PEEK(_bolt)); \ - yarnRelease(_bolt); \ - } -#define TWIST(_bolt, _op, _val) \ - { if (_conddebug) \ - rpmzlogAdd(msq->zlog, "***\t TWIST(%p, %d, %d)\t%ld", _bolt, _op, _val, PEEK(_bolt)); \ - yarnTwist(_bolt, _op, _val); \ - } -#define WAITFOR(_bolt, _op, _val) \ - { if (_conddebug) \ - rpmzLogAdd(msq->zlog, "***\t WAITFOR(%p, %d, %d)\t%ld", _bolt, _op, _val, PEEK(_bolt));\ - yarnWaitFor(_bolt, _op, _val); \ - } - #ifdef __cplusplus GENfree(rpmmsq) #endif /* __cplusplus */ @@ -340,14 +323,81 @@ PRINT(d, ntimeout); PRINT(d, nagain); } - PRINT(p, head); - PRINT(p, tail); + PRINT(p, aioq.head); + PRINT(p, aioq.tail); PRINT(d, nqueued); #undef PRINT } } /* =============================================================== */ +static int _lockdebug = 0; +static int _conddebug = -1; +#define PEEK(_bolt) yarnPeekLock(_bolt) +#define POSSESS(_bolt) \ + { yarnPossess(_bolt); \ + if (zlog && _lockdebug) \ + rpmzLogAdd(zlog, "***\t POSSESS(%p)\t%ld", _bolt, PEEK(_bolt)); \ + } +#define RELEASE(_bolt) \ + { if (zlog && _lockdebug) \ + rpmzLogAdd(zlog, "***\t RELEASE(%p)\t%ld", _bolt, PEEK(_bolt)); \ + yarnRelease(_bolt); \ + } +#define TWIST(_bolt, _op, _val) \ + { if (zlog && _conddebug) \ + rpmzlogAdd(zlog, "***\t TWIST(%p, %d, %d)\t%ld", _bolt, _op, _val, PEEK(_bolt)); \ + yarnTwist(_bolt, _op, _val); \ + } +#define WAITFOR(_bolt, _op, _val) \ + { if (zlog && _conddebug) \ + rpmzLogAdd(zlog, "***\t WAITFOR(%p, %d, %d)\t%ld", _bolt, _op, _val, PEEK(_bolt));\ + yarnWaitFor(_bolt, _op, _val); \ + } + +static inline +void aioqInit(AIOQ_t aioq) +{ + RPM_GNUC_TM_ATOMIC { + aioq->head = NULL; + aioq->tail = &aioq->head; + } +} + +static inline +rpmioItem aioqGetHead(AIOQ_t aioq) +{ + rpmioItem item; + RPM_GNUC_TM_ATOMIC { + if ((item = aioq->head) != NULL) { + aioq->head = item->next; + item->next = NULL; /* XXX rpmmalloc.c ? */ + if (aioq->head == NULL) + aioq->tail = &aioq->head; + } + } + return item; +} + +static inline +void aioqPutTail(AIOQ_t aioq, rpmioItem item) +{ + RPM_GNUC_TM_ATOMIC { + *aioq->tail = item; + aioq->tail = (rpmioItem *) &item->next; + } +} + +static inline +void aioqPutWait(AIOQ_t aioq, rpmioItem item, rpmzLog zlog) +{ + POSSESS(item->use); + aioqPutTail(aioq, item); + WAITFOR(item->use, TO_BE, 1); + RELEASE(item->use); +} + +/* =============================================================== */ static char * rpmmsqDbug(void *_msq, char *b, size_t nb) { rpmmsq msq = (rpmmsq) _msq; @@ -401,8 +451,8 @@ PRINT_SIGEV(p, sigev_notify_attributes); break; } - PRINT(p, head); - PRINT(p, tail); + PRINT(p, aioq.head); + PRINT(p, aioq.tail); PRINT(d, nqueued); #undef PRINT_SIGEV @@ -446,15 +496,8 @@ /* Drain the queue. */ while (1) { rpmioItem item; - RPM_GNUC_TM_ATOMIC { - if ((item = msq->head) == NULL) - break; - item = msq->head; - msq->head = item->next; - item->next = NULL; /* XXX rpmmalloc.c ? */ - if (msq->head == NULL) - msq->tail = &msq->head; - } + if ((item = aioqGetHead(&msq->aioq)) == NULL) + break; while (item) item = rpmioFreePoolItem(item, __FUNCTION__, __FILE__, __LINE__); @@ -571,11 +614,8 @@ msq->msgmax = MSQ_MSGMAX; msq->msgsize = MSQ_MSGSIZE; - /* XXX TODO: use POSIX shared mutexes. */ - RPM_GNUC_TM_ATOMIC { - msq->head = NULL; - msq->tail = &msq->head; - } + /* XXX TODO: use POSIX shared mutexes? */ + aioqInit(&msq->aioq); msq->nqueued = 0; msq->nsent = 0; msq->nrecv = 0; @@ -699,13 +739,7 @@ __FUNCTION__, __FILE__, __LINE__); /* Queue the read. */ - POSSESS(item->use); - RPM_GNUC_TM_ATOMIC { - *msq->tail = item; - msq->tail = (rpmioItem *) &item->next; - } - WAITFOR(item->use, TO_BE, 1); - RELEASE(item->use); + aioqPutWait(&msq->aioq, item, msq->zlog); rc = aio->cb.__return_value; /* XXX aio_return */ if (rc >= 0) @@ -777,13 +811,7 @@ __FUNCTION__, __FILE__, __LINE__); /* Queue the write. */ - POSSESS(item->use); - RPM_GNUC_TM_ATOMIC { - *msq->tail = item; - msq->tail = (rpmioItem *) &item->next; - } - WAITFOR(item->use, TO_BE, 1); - RELEASE(item->use); + aioqPutWait(&msq->aioq, item, msq->zlog); rc = aio->cb.__return_value; /* XXX aio_return */ if (rc < 0) @@ -854,31 +882,6 @@ if (MSQF_ISSET(INFO) || _rpmmsq_debug) rpmmsqDump(NULL, msq, NULL); -#ifdef DYING - /* Wait for the queue to drain. */ - while (1) { - rpmioItem item; - RPM_GNUC_TM_ATOMIC { - if ((item = msq->head) == NULL) - break; - } - POSSESS(item->use); - WAITFOR(item->use, TO_BE, 0); - RELEASE(item->use); - } - - /* Turn off the sigev detached thread. */ - int qid = msq->qid; - msq->qid = -1; - rc = rpmmsqNotify(msq, NULL); - - /* Yield the CPU. */ - sched_yield(); - struct timespec ts = { 0, 100*1000 }; - Z(nanosleep(&ts, NULL)); - /* Wait for the reader thread. */ - -#else if (msq->nsent == 0) { rc = rpmmsqNotify(msq, NULL); rc = Mq_close(msq->qid); @@ -895,19 +898,12 @@ __FUNCTION__, __FILE__, __LINE__); /* Queue the write. */ - POSSESS(item->use); - RPM_GNUC_TM_ATOMIC { - *msq->tail = item; - msq->tail = (rpmioItem *) &item->next; - } - WAITFOR(item->use, TO_BE, 1); - RELEASE(item->use); + aioqPutWait(&msq->aioq, item, msq->zlog); rc = aio->cb.__return_value; /* XXX aio_return */ if (rc < 0) errno = aio_error(&aio->cb); aio = rpmaioFree(aio); -#endif } @@ -1092,14 +1088,8 @@ rpmioItem item; /* Get next request from queue. */ - RPM_GNUC_TM_ATOMIC { - if ((item = msq->head) == NULL) - break; - msq->head = item->next; - item->next = NULL; /* XXX rpmmalloc.c ? */ - if (msq->head == NULL) - msq->tail = &msq->head; - } + if ((item = aioqGetHead(&msq->aioq)) == NULL) + break; rpmaio aio = (rpmaio) item; char *b = (char *)aio->cb.aio_buf; @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmaio.c ============================================================================ $ cvs diff -u -r1.1.2.1 -r1.1.2.2 rpmaio.c --- rpm/rpmio/rpmaio.c 28 May 2017 18:39:22 -0000 1.1.2.1 +++ rpm/rpmio/rpmaio.c 28 May 2017 20:18:16 -0000 1.1.2.2 @@ -27,7 +27,7 @@ #define Z(_rc) assert((_rc) == 0) -#define PEEK(_bolt) yarnPeekLock(_bolt) +#define PEEK(_bolt) yarnPeekLock(_bolt) #ifdef __cplusplus GENfree(rpmaio) @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmsq.h ============================================================================ $ cvs diff -u -r1.1.2.13 -r1.1.2.14 rpmmsq.h --- rpm/rpmio/rpmmsq.h 28 May 2017 18:39:22 -0000 1.1.2.13 +++ rpm/rpmio/rpmmsq.h 28 May 2017 20:18:16 -0000 1.1.2.14 @@ -39,8 +39,7 @@ #if defined(_RPMMSQ_INTERNAL) struct rpmmsq_s { struct rpmioItem_s _item; /*!< usage mutex and pool identifier. */ - rpmioItem head; /*!< message queue head. */ - rpmioItem * tail; /*!< message queue tail. */ + struct AIOQ_s aioq; /*!< message inflight queue */ const char * qname; /*!> message queue path. */ const char * fmode; int fdno; @@ -71,7 +70,6 @@ struct msginfo mi; #endif /* WITH_MSQ */ - rpmzLog zlog; }; #endif /* _RPMMSQ_INTERNAL */ @@ . ______________________________________________________________________ RPM Package Manager http://rpm5.org CVS Sources Repository rpm-cvs@rpm5.org